Live: remove hardcoded frontend channel support (#41584)

This commit is contained in:
Ryan McKinley
2021-11-15 07:54:14 -08:00
committed by GitHub
parent bfec79a7dc
commit c780854a18
19 changed files with 54 additions and 520 deletions

View File

@ -9,7 +9,6 @@ import { DataFrame, DataFrameDTO } from './dataFrame';
import { RawTimeRange, TimeRange } from './time'; import { RawTimeRange, TimeRange } from './time';
import { ScopedVars } from './ScopedVars'; import { ScopedVars } from './ScopedVars';
import { CoreApp } from './app'; import { CoreApp } from './app';
import { LiveChannelSupport } from './live';
import { CustomVariableSupport, DataSourceVariableSupport, StandardVariableSupport } from './variables'; import { CustomVariableSupport, DataSourceVariableSupport, StandardVariableSupport } from './variables';
import { makeClassES5Compatible } from '../utils/makeClassES5Compatible'; import { makeClassES5Compatible } from '../utils/makeClassES5Compatible';
import { DataQuery } from './query'; import { DataQuery } from './query';
@ -343,15 +342,6 @@ abstract class DataSourceApi<
*/ */
annotationQuery?(options: AnnotationQueryRequest<TQuery>): Promise<AnnotationEvent[]>; annotationQuery?(options: AnnotationQueryRequest<TQuery>): Promise<AnnotationEvent[]>;
/**
* Define live streaming behavior within this datasource settings
*
* Note: `plugin.json` must also define `live: true`
*
* @alpha -- experimental
*/
channelSupport?: LiveChannelSupport;
/** /**
* Defines new variable support * Defines new variable support
* @alpha -- experimental * @alpha -- experimental

View File

@ -25,31 +25,6 @@ export enum LiveChannelType {
JSON = 'json', // arbitray json message JSON = 'json', // arbitray json message
} }
/**
* @alpha -- experimental
*/
export interface LiveChannelConfig {
/**
* An optional description for the channel
*/
description?: string;
/**
* What kind of data do you expect
*/
type?: LiveChannelType;
/**
* The channel keeps track of who else is connected to the same channel
*/
hasPresence?: boolean;
/**
* Allow users to write to the connection
*/
canPublish?: boolean;
}
export enum LiveChannelConnectionState { export enum LiveChannelConnectionState {
/** The connection is not yet established */ /** The connection is not yet established */
Pending = 'pending', Pending = 'pending',
@ -206,13 +181,3 @@ export function toLiveChannelId(addr: LiveChannelAddress): string {
} }
return id + '/' + addr.path; return id + '/' + addr.path;
} }
/**
* @alpha -- experimental
*/
export interface LiveChannelSupport {
/**
* Get the channel handler for the path, or throw an error if invalid
*/
getChannelConfig(path: string): LiveChannelConfig | undefined;
}

View File

@ -1,6 +1,5 @@
import { ComponentClass } from 'react'; import { ComponentClass } from 'react';
import { KeyValue } from './data'; import { KeyValue } from './data';
import { LiveChannelSupport } from './live';
/** Describes plugins life cycle status */ /** Describes plugins life cycle status */
export enum PluginState { export enum PluginState {
@ -170,13 +169,6 @@ export class GrafanaPlugin<T extends PluginMeta = PluginMeta> {
// This is set if the plugin system had errors loading the plugin // This is set if the plugin system had errors loading the plugin
loadError?: boolean; loadError?: boolean;
/**
* Live streaming support
*
* Note: `plugin.json` must also define `live: true`
*/
channelSupport?: LiveChannelSupport;
// Config control (app/datasource) // Config control (app/datasource)
angularConfigCtrl?: any; angularConfigCtrl?: any;
@ -193,10 +185,10 @@ export class GrafanaPlugin<T extends PluginMeta = PluginMeta> {
} }
/** /**
* Specify how the plugin should support paths within the live streaming environment * @deprecated -- this is no longer necessary and will be removed
*/ */
setChannelSupport(support: LiveChannelSupport) { setChannelSupport(support: any) {
this.channelSupport = support; console.warn('[deprecation] plugin is using ignored option: setChannelSupport', this.meta);
return this; return this;
} }

View File

@ -2,7 +2,6 @@ import {
DataFrame, DataFrame,
DataQueryResponse, DataQueryResponse,
LiveChannelAddress, LiveChannelAddress,
LiveChannelConfig,
LiveChannelEvent, LiveChannelEvent,
LiveChannelPresenceStatus, LiveChannelPresenceStatus,
StreamingFrameOptions, StreamingFrameOptions,
@ -36,16 +35,6 @@ export interface GrafanaLiveSrv {
*/ */
getConnectionState(): Observable<boolean>; getConnectionState(): Observable<boolean>;
/**
* Get a channel. If the scope, namespace, or path is invalid, a shutdown
* channel will be returned with an error state indicated in its status.
*
* This is a singleton instance that stays active until explicitly shutdown.
* Multiple requests for this channel will return the same object until
* the channel is shutdown
*/
getChannelInfo(address: LiveChannelAddress): Promise<LiveChannelConfig>;
/** /**
* Watch for messages in a channel * Watch for messages in a channel
*/ */

View File

@ -1,5 +1,4 @@
import { import {
LiveChannelConfig,
LiveChannelStatusEvent, LiveChannelStatusEvent,
LiveChannelEvent, LiveChannelEvent,
LiveChannelEventType, LiveChannelEventType,
@ -7,6 +6,7 @@ import {
LiveChannelPresenceStatus, LiveChannelPresenceStatus,
LiveChannelAddress, LiveChannelAddress,
DataFrameJSON, DataFrameJSON,
isValidLiveChannelAddress,
} from '@grafana/data'; } from '@grafana/data';
import Centrifuge, { import Centrifuge, {
JoinLeaveContext, JoinLeaveContext,
@ -34,10 +34,9 @@ export class CentrifugeLiveChannel<T = any> {
// Hold on to the last header with schema // Hold on to the last header with schema
lastMessageWithSchema?: DataFrameJSON; lastMessageWithSchema?: DataFrameJSON;
/** Static definition of the channel definition. This may describe the channel usage */
config?: LiveChannelConfig;
subscription?: Centrifuge.Subscription; subscription?: Centrifuge.Subscription;
shutdownCallback?: () => void; shutdownCallback?: () => void;
initalized?: boolean;
constructor(id: string, addr: LiveChannelAddress) { constructor(id: string, addr: LiveChannelAddress) {
this.id = id; this.id = id;
@ -48,14 +47,18 @@ export class CentrifugeLiveChannel<T = any> {
timestamp: this.opened, timestamp: this.opened,
state: LiveChannelConnectionState.Pending, state: LiveChannelConnectionState.Pending,
}; };
if (!isValidLiveChannelAddress(addr)) {
this.currentStatus.state = LiveChannelConnectionState.Invalid;
this.currentStatus.error = 'invalid channel address';
}
} }
// This should only be called when centrifuge is connected // This should only be called when centrifuge is connected
initalize(config: LiveChannelConfig): SubscriptionEvents { initalize(): SubscriptionEvents {
if (this.config) { if (this.initalized) {
throw new Error('Channel already initalized: ' + this.id); throw new Error('Channel already initalized: ' + this.id);
} }
this.config = config; this.initalized = true;
const events: SubscriptionEvents = { const events: SubscriptionEvents = {
// Called when a message is recieved from the socket // Called when a message is recieved from the socket
@ -108,14 +111,12 @@ export class CentrifugeLiveChannel<T = any> {
}, },
}; };
if (config.hasPresence) { events.join = (ctx: JoinLeaveContext) => {
events.join = (ctx: JoinLeaveContext) => { this.stream.next({ type: LiveChannelEventType.Join, user: ctx.info.user });
this.stream.next({ type: LiveChannelEventType.Join, user: ctx.info.user }); };
}; events.leave = (ctx: JoinLeaveContext) => {
events.leave = (ctx: JoinLeaveContext) => { this.stream.next({ type: LiveChannelEventType.Leave, user: ctx.info.user });
this.stream.next({ type: LiveChannelEventType.Leave, user: ctx.info.user }); };
};
}
return events; return events;
} }

View File

@ -10,7 +10,6 @@ import {
isLiveChannelMessageEvent, isLiveChannelMessageEvent,
isLiveChannelStatusEvent, isLiveChannelStatusEvent,
LiveChannelAddress, LiveChannelAddress,
LiveChannelConfig,
LiveChannelConnectionState, LiveChannelConnectionState,
LiveChannelEvent, LiveChannelEvent,
LiveChannelPresenceStatus, LiveChannelPresenceStatus,
@ -38,19 +37,19 @@ export interface CentrifugeSrv {
/** /**
* Watch for messages in a channel * Watch for messages in a channel
*/ */
getStream<T>(address: LiveChannelAddress, config: LiveChannelConfig): Observable<LiveChannelEvent<T>>; getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>>;
/** /**
* Connect to a channel and return results as DataFrames * Connect to a channel and return results as DataFrames
*/ */
getDataStream(options: LiveDataStreamOptions, config: LiveChannelConfig): Observable<DataQueryResponse>; getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse>;
/** /**
* For channels that support presence, this will request the current state from the server. * For channels that support presence, this will request the current state from the server.
* *
* Join and leave messages will be sent to the open stream * Join and leave messages will be sent to the open stream
*/ */
getPresence(address: LiveChannelAddress, config: LiveChannelConfig): Promise<LiveChannelPresenceStatus>; getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus>;
} }
export class CentrifugeService implements CentrifugeSrv { export class CentrifugeService implements CentrifugeSrv {
@ -110,7 +109,7 @@ export class CentrifugeService implements CentrifugeSrv {
* Get a channel. If the scope, namespace, or path is invalid, a shutdown * Get a channel. If the scope, namespace, or path is invalid, a shutdown
* channel will be returned with an error state indicated in its status * channel will be returned with an error state indicated in its status
*/ */
private getChannel<TMessage>(addr: LiveChannelAddress, config: LiveChannelConfig): CentrifugeLiveChannel<TMessage> { private getChannel<TMessage>(addr: LiveChannelAddress): CentrifugeLiveChannel<TMessage> {
const id = `${this.deps.orgId}/${addr.scope}/${addr.namespace}/${addr.path}`; const id = `${this.deps.orgId}/${addr.scope}/${addr.namespace}/${addr.path}`;
let channel = this.open.get(id); let channel = this.open.get(id);
if (channel != null) { if (channel != null) {
@ -118,13 +117,16 @@ export class CentrifugeService implements CentrifugeSrv {
} }
channel = new CentrifugeLiveChannel(id, addr); channel = new CentrifugeLiveChannel(id, addr);
if (channel.currentStatus.state === LiveChannelConnectionState.Invalid) {
return channel;
}
channel.shutdownCallback = () => { channel.shutdownCallback = () => {
this.open.delete(id); // remove it from the list of open channels this.open.delete(id); // remove it from the list of open channels
}; };
this.open.set(id, channel); this.open.set(id, channel);
// Initialize the channel in the background // Initialize the channel in the background
this.initChannel(config, channel).catch((err) => { this.initChannel(channel).catch((err) => {
if (channel) { if (channel) {
channel.currentStatus.state = LiveChannelConnectionState.Invalid; channel.currentStatus.state = LiveChannelConnectionState.Invalid;
channel.shutdownWithError(err); channel.shutdownWithError(err);
@ -136,8 +138,8 @@ export class CentrifugeService implements CentrifugeSrv {
return channel; return channel;
} }
private async initChannel(config: LiveChannelConfig, channel: CentrifugeLiveChannel): Promise<void> { private async initChannel(channel: CentrifugeLiveChannel): Promise<void> {
const events = channel.initalize(config); const events = channel.initalize();
if (!this.centrifuge.isConnected()) { if (!this.centrifuge.isConnected()) {
await this.connectionBlocker; await this.connectionBlocker;
} }
@ -159,16 +161,16 @@ export class CentrifugeService implements CentrifugeSrv {
/** /**
* Watch for messages in a channel * Watch for messages in a channel
*/ */
getStream<T>(address: LiveChannelAddress, config: LiveChannelConfig): Observable<LiveChannelEvent<T>> { getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>> {
return this.getChannel<T>(address, config).getStream(); return this.getChannel<T>(address).getStream();
} }
/** /**
* Connect to a channel and return results as DataFrames * Connect to a channel and return results as DataFrames
*/ */
getDataStream(options: LiveDataStreamOptions, config: LiveChannelConfig): Observable<DataQueryResponse> { getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> {
return new Observable<DataQueryResponse>((subscriber) => { return new Observable<DataQueryResponse>((subscriber) => {
const channel = this.getChannel(options.addr, config); const channel = this.getChannel(options.addr);
const key = options.key ?? `xstr/${streamCounter++}`; const key = options.key ?? `xstr/${streamCounter++}`;
let data: StreamingDataFrame | undefined = undefined; let data: StreamingDataFrame | undefined = undefined;
let filtered: DataFrame | undefined = undefined; let filtered: DataFrame | undefined = undefined;
@ -273,8 +275,8 @@ export class CentrifugeService implements CentrifugeSrv {
* *
* Join and leave messages will be sent to the open stream * Join and leave messages will be sent to the open stream
*/ */
getPresence(address: LiveChannelAddress, config: LiveChannelConfig): Promise<LiveChannelPresenceStatus> { getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus> {
return this.getChannel(address, config).getPresence(); return this.getChannel(address).getPresence();
} }
} }

View File

@ -2,7 +2,7 @@ import { CentrifugeService, CentrifugeSrvDeps } from './service';
import * as comlink from 'comlink'; import * as comlink from 'comlink';
import './transferHandlers'; import './transferHandlers';
import { remoteObservableAsObservable } from './remoteObservable'; import { remoteObservableAsObservable } from './remoteObservable';
import { LiveChannelAddress, LiveChannelConfig } from '@grafana/data'; import { LiveChannelAddress } from '@grafana/data';
import { LiveDataStreamOptions } from '@grafana/runtime'; import { LiveDataStreamOptions } from '@grafana/runtime';
let centrifuge: CentrifugeService; let centrifuge: CentrifugeService;
@ -23,16 +23,16 @@ const getConnectionState = () => {
return comlink.proxy(centrifuge.getConnectionState()); return comlink.proxy(centrifuge.getConnectionState());
}; };
const getDataStream = (options: LiveDataStreamOptions, config: LiveChannelConfig) => { const getDataStream = (options: LiveDataStreamOptions) => {
return comlink.proxy(centrifuge.getDataStream(options, config)); return comlink.proxy(centrifuge.getDataStream(options));
}; };
const getStream = (address: LiveChannelAddress, config: LiveChannelConfig) => { const getStream = (address: LiveChannelAddress) => {
return comlink.proxy(centrifuge.getStream(address, config)); return comlink.proxy(centrifuge.getStream(address));
}; };
const getPresence = async (address: LiveChannelAddress, config: LiveChannelConfig) => { const getPresence = async (address: LiveChannelAddress) => {
return await centrifuge.getPresence(address, config); return await centrifuge.getPresence(address);
}; };
const workObj = { const workObj = {

View File

@ -4,7 +4,7 @@ import './transferHandlers';
import * as comlink from 'comlink'; import * as comlink from 'comlink';
import { asyncScheduler, Observable, observeOn } from 'rxjs'; import { asyncScheduler, Observable, observeOn } from 'rxjs';
import { LiveChannelAddress, LiveChannelConfig, LiveChannelEvent } from '@grafana/data'; import { LiveChannelAddress, LiveChannelEvent } from '@grafana/data';
import { promiseWithRemoteObservableAsObservable } from './remoteObservable'; import { promiseWithRemoteObservableAsObservable } from './remoteObservable';
import { createWorker } from './createCentrifugeServiceWorker'; import { createWorker } from './createCentrifugeServiceWorker';
@ -20,21 +20,21 @@ export class CentrifugeServiceWorkerProxy implements CentrifugeSrv {
return promiseWithRemoteObservableAsObservable(this.centrifugeWorker.getConnectionState()); return promiseWithRemoteObservableAsObservable(this.centrifugeWorker.getConnectionState());
}; };
getDataStream: CentrifugeSrv['getDataStream'] = (options, config) => { getDataStream: CentrifugeSrv['getDataStream'] = (options) => {
return promiseWithRemoteObservableAsObservable(this.centrifugeWorker.getDataStream(options, config)).pipe( return promiseWithRemoteObservableAsObservable(this.centrifugeWorker.getDataStream(options)).pipe(
// async scheduler splits the synchronous task of deserializing data from web worker and // async scheduler splits the synchronous task of deserializing data from web worker and
// consuming the message (ie. updating react component) into two to avoid blocking the event loop // consuming the message (ie. updating react component) into two to avoid blocking the event loop
observeOn(asyncScheduler) observeOn(asyncScheduler)
); );
}; };
getPresence: CentrifugeSrv['getPresence'] = (address, config) => { getPresence: CentrifugeSrv['getPresence'] = (address) => {
return this.centrifugeWorker.getPresence(address, config); return this.centrifugeWorker.getPresence(address);
}; };
getStream: CentrifugeSrv['getStream'] = <T>(address: LiveChannelAddress, config: LiveChannelConfig) => { getStream: CentrifugeSrv['getStream'] = <T>(address: LiveChannelAddress) => {
return promiseWithRemoteObservableAsObservable( return promiseWithRemoteObservableAsObservable(
this.centrifugeWorker.getStream(address, config) as Promise<comlink.Remote<Observable<LiveChannelEvent<T>>>> this.centrifugeWorker.getStream(address) as Promise<comlink.Remote<Observable<LiveChannelEvent<T>>>>
); );
}; };
} }

View File

@ -1,36 +0,0 @@
import { LiveChannelScope, LiveChannelSupport, SelectableValue } from '@grafana/data';
import {
grafanaLiveCoreFeatures,
GrafanaLiveDataSourceScope,
GrafanaLivePluginScope,
GrafanaLiveScope,
GrafanaLiveStreamScope,
} from './scope';
import { GrafanaLiveChannelConfigSrv, ExistingLiveChannelScope } from './types';
export class GrafanaLiveChannelConfigService implements GrafanaLiveChannelConfigSrv {
private readonly scopes: Record<LiveChannelScope, GrafanaLiveScope>;
constructor() {
this.scopes = Object.freeze({
[LiveChannelScope.Grafana]: grafanaLiveCoreFeatures,
[LiveChannelScope.DataSource]: new GrafanaLiveDataSourceScope(),
[LiveChannelScope.Plugin]: new GrafanaLivePluginScope(),
[LiveChannelScope.Stream]: new GrafanaLiveStreamScope(),
});
}
private getScope = (liveChannelScope: ExistingLiveChannelScope): GrafanaLiveScope =>
this.scopes[liveChannelScope as LiveChannelScope];
doesScopeExist = (liveChannelScope: LiveChannelScope): liveChannelScope is ExistingLiveChannelScope =>
Boolean(this.scopes[liveChannelScope]);
getChannelSupport = async (
liveChannelScope: ExistingLiveChannelScope,
namespace: string
): Promise<LiveChannelSupport | undefined> => this.getScope(liveChannelScope).getChannelSupport(namespace);
getNamespaces = async (liveChannelScope: ExistingLiveChannelScope): Promise<Array<SelectableValue<string>>> =>
this.getScope(liveChannelScope).listNamespaces();
}

View File

@ -1,182 +0,0 @@
import { LiveChannelScope, LiveChannelSupport, SelectableValue } from '@grafana/data';
import { getDataSourceSrv } from '@grafana/runtime';
import { config } from 'app/core/config';
import { loadPlugin } from 'app/features/plugins/PluginPage';
import { LiveMeasurementsSupport } from '../measurements/measurementsSupport';
import { CoreGrafanaLiveFeature } from './types';
export abstract class GrafanaLiveScope {
constructor(protected scope: LiveChannelScope) {}
/**
* Load the real namespaces
*/
abstract getChannelSupport(namespace: string): Promise<LiveChannelSupport | undefined>;
/**
* List the possible values within this scope
*/
abstract listNamespaces(): Promise<Array<SelectableValue<string>>>;
}
class GrafanaLiveCoreScope extends GrafanaLiveScope {
readonly features = new Map<string, LiveChannelSupport>();
readonly namespaces: Array<SelectableValue<string>> = [];
constructor() {
super(LiveChannelScope.Grafana);
}
register(feature: CoreGrafanaLiveFeature) {
this.features.set(feature.name, feature.support);
this.namespaces.push({
value: feature.name,
label: feature.name,
description: feature.description,
});
}
/**
* Load the real namespaces
*/
async getChannelSupport(namespace: string) {
const v = this.features.get(namespace);
if (v) {
return Promise.resolve(v);
}
throw new Error('unknown feature: ' + namespace);
}
/**
* List the possible values within this scope
*/
listNamespaces() {
return Promise.resolve(this.namespaces);
}
}
export const grafanaLiveCoreFeatures = new GrafanaLiveCoreScope();
export class GrafanaLiveDataSourceScope extends GrafanaLiveScope {
names?: Array<SelectableValue<string>>;
constructor() {
super(LiveChannelScope.DataSource);
}
/**
* Load the real namespaces
*/
async getChannelSupport(namespace: string) {
const ds = await getDataSourceSrv().get(namespace);
if (ds.channelSupport) {
return ds.channelSupport;
}
return new LiveMeasurementsSupport(); // default support?
}
/**
* List the possible values within this scope
*/
async listNamespaces() {
if (this.names) {
return Promise.resolve(this.names);
}
const names: Array<SelectableValue<string>> = [];
for (const [key, ds] of Object.entries(config.datasources)) {
if (ds.meta.live) {
try {
const s = await this.getChannelSupport(key); // ds.name or ID?
if (s) {
names.push({
label: ds.name,
value: ds.type,
description: ds.type,
});
}
} catch (err) {
err.isHandled = true;
}
}
}
return (this.names = names);
}
}
export class GrafanaLivePluginScope extends GrafanaLiveScope {
names?: Array<SelectableValue<string>>;
constructor() {
super(LiveChannelScope.Plugin);
}
/**
* Load the real namespaces
*/
async getChannelSupport(namespace: string) {
const plugin = await loadPlugin(namespace);
if (!plugin) {
throw new Error('Unknown streaming plugin: ' + namespace);
}
if (plugin.channelSupport) {
return plugin.channelSupport; // explicit
}
throw new Error('Plugin does not support streaming: ' + namespace);
}
/**
* List the possible values within this scope
*/
async listNamespaces() {
if (this.names) {
return Promise.resolve(this.names);
}
const names: Array<SelectableValue<string>> = [];
// TODO add list to config
for (const [key, panel] of Object.entries(config.panels)) {
if (panel.live) {
try {
const s = await this.getChannelSupport(key); // ds.name or ID?
if (s) {
names.push({
label: panel.name,
value: key,
description: panel.info?.description,
});
}
} catch (err) {
err.isHandled = true;
}
}
}
return (this.names = names);
}
}
export class GrafanaLiveStreamScope extends GrafanaLiveScope {
names?: Array<SelectableValue<string>>;
constructor() {
super(LiveChannelScope.Stream);
}
async getChannelSupport(namespace: string) {
return new LiveMeasurementsSupport();
}
/**
* List the possible values within this scope
*/
async listNamespaces() {
if (this.names) {
return Promise.resolve(this.names);
}
const names: Array<SelectableValue<string>> = [];
// TODO!!!
return (this.names = names);
}
}

View File

@ -1,18 +0,0 @@
import { LiveChannelScope, LiveChannelSupport, SelectableValue } from '@grafana/data';
export interface CoreGrafanaLiveFeature {
name: string;
support: LiveChannelSupport;
description: string;
}
export type ExistingLiveChannelScope = LiveChannelScope & { readonly discriminator: unique symbol };
export interface GrafanaLiveChannelConfigSrv {
doesScopeExist: (liveChannelScope: LiveChannelScope) => liveChannelScope is ExistingLiveChannelScope;
getChannelSupport: (
liveChannelScope: ExistingLiveChannelScope,
namespace: string
) => Promise<LiveChannelSupport | undefined>;
getNamespaces: (liveChannelScope: ExistingLiveChannelScope) => Promise<Array<SelectableValue<string>>>;
}

View File

@ -12,7 +12,6 @@ import {
} from '@grafana/data'; } from '@grafana/data';
import { DashboardChangedModal } from './DashboardChangedModal'; import { DashboardChangedModal } from './DashboardChangedModal';
import { DashboardEvent, DashboardEventAction } from './types'; import { DashboardEvent, DashboardEventAction } from './types';
import { CoreGrafanaLiveFeature } from '../channel-config/types';
import { sessionId } from 'app/features/live'; import { sessionId } from 'app/features/live';
import { ShowModalReactEvent } from '../../../types/events'; import { ShowModalReactEvent } from '../../../types/events';
import { Unsubscribable } from 'rxjs'; import { Unsubscribable } from 'rxjs';
@ -159,16 +158,3 @@ class DashboardWatcher {
} }
export const dashboardWatcher = new DashboardWatcher(); export const dashboardWatcher = new DashboardWatcher();
export function getDashboardChannelsFeature(): CoreGrafanaLiveFeature {
return {
name: 'dashboard',
support: {
getChannelConfig: (path: string) => ({
description: 'Dashboard change events',
hasPresence: true,
}),
},
description: 'Dashboard listener',
};
}

View File

@ -1,34 +0,0 @@
import { LiveChannelType } from '@grafana/data';
import { getDashboardChannelsFeature } from './dashboard/dashboardWatcher';
import { grafanaLiveCoreFeatures } from './channel-config/scope';
export function registerLiveFeatures() {
grafanaLiveCoreFeatures.register({
name: 'testdata',
support: {
getChannelConfig: (path: string) => {
return {
type: LiveChannelType.DataStream,
};
},
},
description: 'Test data generations',
});
grafanaLiveCoreFeatures.register({
name: 'broadcast',
support: {
getChannelConfig: (path: string) => {
return {
type: LiveChannelType.JSON,
canPublish: true,
description: 'Broadcast any messages to a channel',
};
},
},
description: 'Broadcast will send/receive any JSON object in a channel',
});
// dashboard/*
grafanaLiveCoreFeatures.register(getDashboardChannelsFeature());
}

View File

@ -1,17 +1,10 @@
import { config, getBackendSrv, getGrafanaLiveSrv, setGrafanaLiveSrv } from '@grafana/runtime'; import { config, getBackendSrv, getGrafanaLiveSrv, setGrafanaLiveSrv } from '@grafana/runtime';
import { registerLiveFeatures } from './features';
import { GrafanaLiveService } from './live'; import { GrafanaLiveService } from './live';
import { GrafanaLiveChannelConfigService } from './channel-config';
import { GrafanaLiveChannelConfigSrv } from './channel-config/types';
import { contextSrv } from '../../core/services/context_srv'; import { contextSrv } from '../../core/services/context_srv';
import { CentrifugeServiceWorkerProxy } from './centrifuge/serviceWorkerProxy'; import { CentrifugeServiceWorkerProxy } from './centrifuge/serviceWorkerProxy';
import { CentrifugeService } from './centrifuge/service'; import { CentrifugeService } from './centrifuge/service';
import { liveTimer } from 'app/features/dashboard/dashgrid/liveTimer'; import { liveTimer } from 'app/features/dashboard/dashgrid/liveTimer';
const grafanaLiveScopesSingleton = new GrafanaLiveChannelConfigService();
export const getGrafanaLiveScopes = (): GrafanaLiveChannelConfigSrv => grafanaLiveScopesSingleton;
export const sessionId = export const sessionId =
(window as any)?.grafanaBootData?.user?.id + (window as any)?.grafanaBootData?.user?.id +
'/' + '/' +
@ -35,12 +28,10 @@ export function initGrafanaLive() {
setGrafanaLiveSrv( setGrafanaLiveSrv(
new GrafanaLiveService({ new GrafanaLiveService({
scopes: getGrafanaLiveScopes(),
centrifugeSrv, centrifugeSrv,
backendSrv: getBackendSrv(), backendSrv: getBackendSrv(),
}) })
); );
registerLiveFeatures();
} }
export function getGrafanaLiveCentrifugeSrv() { export function getGrafanaLiveCentrifugeSrv() {

View File

@ -1,24 +1,16 @@
import { BackendSrv, GrafanaLiveSrv, LiveDataStreamOptions } from '@grafana/runtime'; import { BackendSrv, GrafanaLiveSrv, LiveDataStreamOptions } from '@grafana/runtime';
import { CentrifugeSrv } from './centrifuge/service'; import { CentrifugeSrv } from './centrifuge/service';
import { mergeMap, from, of, Observable } from 'rxjs'; import { Observable } from 'rxjs';
import { import {
DataQueryResponse, DataQueryResponse,
isValidLiveChannelAddress,
LiveChannelAddress, LiveChannelAddress,
LiveChannelConfig,
LiveChannelConnectionState,
LiveChannelEvent, LiveChannelEvent,
LiveChannelEventType,
LiveChannelPresenceStatus, LiveChannelPresenceStatus,
LoadingState,
toLiveChannelId, toLiveChannelId,
} from '@grafana/data'; } from '@grafana/data';
import { GrafanaLiveChannelConfigSrv } from './channel-config/types';
import { catchError } from 'rxjs/operators';
type GrafanaLiveServiceDeps = { type GrafanaLiveServiceDeps = {
scopes: GrafanaLiveChannelConfigSrv;
centrifugeSrv: CentrifugeSrv; centrifugeSrv: CentrifugeSrv;
backendSrv: BackendSrv; backendSrv: BackendSrv;
}; };
@ -37,23 +29,14 @@ export class GrafanaLiveService implements GrafanaLiveSrv {
* Connect to a channel and return results as DataFrames * Connect to a channel and return results as DataFrames
*/ */
getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> { getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> {
const channelConfig = this.getChannelInfo(options.addr); return this.deps.centrifugeSrv.getDataStream(options);
return from(channelConfig).pipe(
mergeMap((config) => this.deps.centrifugeSrv.getDataStream(options, config)),
catchError((error) => this.getInvalidDataStream(error, options))
);
} }
/** /**
* Watch for messages in a channel * Watch for messages in a channel
*/ */
getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>> { getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>> {
const channelConfig = this.getChannelInfo(address); return this.deps.centrifugeSrv.getStream<T>(address);
return from(channelConfig).pipe(
mergeMap((config) => this.deps.centrifugeSrv.getStream<T>(address, config)),
catchError((error) => this.getInvalidChannelStream<T>(error, address))
);
} }
/** /**
@ -74,55 +57,6 @@ export class GrafanaLiveService implements GrafanaLiveSrv {
* Join and leave messages will be sent to the open stream * Join and leave messages will be sent to the open stream
*/ */
async getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus> { async getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus> {
const channelConfig = await this.getChannelInfo(address); return this.deps.centrifugeSrv.getPresence(address);
return this.deps.centrifugeSrv.getPresence(address, channelConfig);
} }
/**
* Get a channel. If the scope, namespace, or path is invalid, a shutdown
* channel will be returned with an error state indicated in its status.
*
* This is a singleton instance that stays active until explicitly shutdown.
* Multiple requests for this channel will return the same object until
* the channel is shutdown
*/
async getChannelInfo(addr: LiveChannelAddress): Promise<LiveChannelConfig> {
if (!isValidLiveChannelAddress(addr)) {
return Promise.reject('invalid live channel address');
}
if (!this.deps.scopes.doesScopeExist(addr.scope)) {
return Promise.reject('invalid scope');
}
const support = await this.deps.scopes.getChannelSupport(addr.scope, addr.namespace);
if (!support) {
return Promise.reject(addr.namespace + ' does not support streaming');
}
return support.getChannelConfig(addr.path)!;
}
private getInvalidChannelStream = <T>(error: Error, address: LiveChannelAddress): Observable<LiveChannelEvent<T>> => {
return of({
type: LiveChannelEventType.Status,
id: `${address.scope}/${address.namespace}/${address.path}`,
timestamp: Date.now(),
state: LiveChannelConnectionState.Invalid,
error,
message: error.message,
});
};
private getInvalidDataStream = (error: Error, options: LiveDataStreamOptions): Observable<DataQueryResponse> => {
return of({
error: {
data: {
error: error.stack,
},
message: error.message,
},
state: LoadingState.Error,
data: options.frame ? [options.frame] : [],
});
};
} }

View File

@ -1,17 +0,0 @@
import { LiveChannelSupport, LiveChannelConfig, LiveChannelType } from '@grafana/data';
/**
* Generic description of channels that support streams
*
* @alpha
*/
export class LiveMeasurementsSupport implements LiveChannelSupport {
/**
* Get the channel handler for the path, or throw an error if invalid
*/
getChannelConfig(path: string): LiveChannelConfig | undefined {
return {
type: LiveChannelType.DataStream,
};
}
}

View File

@ -6,7 +6,6 @@ import { CloudWatchJsonData, CloudWatchQuery } from './types';
import { CloudWatchLogsQueryEditor } from './components/LogsQueryEditor'; import { CloudWatchLogsQueryEditor } from './components/LogsQueryEditor';
import { PanelQueryEditor } from './components/PanelQueryEditor'; import { PanelQueryEditor } from './components/PanelQueryEditor';
import LogsCheatSheet from './components/LogsCheatSheet'; import LogsCheatSheet from './components/LogsCheatSheet';
import { LiveMeasurementsSupport } from 'app/features/live/measurements/measurementsSupport';
export const plugin = new DataSourcePlugin<CloudWatchDatasource, CloudWatchQuery, CloudWatchJsonData>( export const plugin = new DataSourcePlugin<CloudWatchDatasource, CloudWatchQuery, CloudWatchJsonData>(
CloudWatchDatasource CloudWatchDatasource
@ -16,5 +15,4 @@ export const plugin = new DataSourcePlugin<CloudWatchDatasource, CloudWatchQuery
.setQueryEditor(PanelQueryEditor) .setQueryEditor(PanelQueryEditor)
.setExploreMetricsQueryField(PanelQueryEditor) .setExploreMetricsQueryField(PanelQueryEditor)
.setExploreLogsQueryField(CloudWatchLogsQueryEditor) .setExploreLogsQueryField(CloudWatchLogsQueryEditor)
.setAnnotationQueryCtrl(CloudWatchAnnotationsQueryCtrl) .setAnnotationQueryCtrl(CloudWatchAnnotationsQueryCtrl);
.setChannelSupport(new LiveMeasurementsSupport());

View File

@ -3,7 +3,6 @@ import { TestDataDataSource } from './datasource';
import { TestInfoTab } from './TestInfoTab'; import { TestInfoTab } from './TestInfoTab';
import { ConfigEditor } from './ConfigEditor'; import { ConfigEditor } from './ConfigEditor';
import { QueryEditor } from './QueryEditor'; import { QueryEditor } from './QueryEditor';
import { LiveMeasurementsSupport } from 'app/features/live/measurements/measurementsSupport';
class TestDataAnnotationsQueryCtrl { class TestDataAnnotationsQueryCtrl {
annotation: any; annotation: any;
@ -14,7 +13,6 @@ class TestDataAnnotationsQueryCtrl {
export const plugin = new DataSourcePlugin(TestDataDataSource) export const plugin = new DataSourcePlugin(TestDataDataSource)
.setConfigEditor(ConfigEditor) .setConfigEditor(ConfigEditor)
.setQueryEditor(QueryEditor) .setQueryEditor(QueryEditor)
.setChannelSupport(new LiveMeasurementsSupport())
.setAnnotationQueryCtrl(TestDataAnnotationsQueryCtrl) .setAnnotationQueryCtrl(TestDataAnnotationsQueryCtrl)
.addConfigPage({ .addConfigPage({
title: 'Setup', title: 'Setup',

View File

@ -4,14 +4,12 @@ import { Select, Alert, Label, stylesFactory } from '@grafana/ui';
import { import {
LiveChannelScope, LiveChannelScope,
LiveChannelAddress, LiveChannelAddress,
LiveChannelSupport,
SelectableValue, SelectableValue,
StandardEditorProps, StandardEditorProps,
GrafanaTheme, GrafanaTheme,
} from '@grafana/data'; } from '@grafana/data';
import { LivePanelOptions } from './types'; import { LivePanelOptions } from './types';
import { getGrafanaLiveScopes } from 'app/features/live';
import { config } from 'app/core/config'; import { config } from 'app/core/config';
type Props = StandardEditorProps<LiveChannelAddress, any, LivePanelOptions>; type Props = StandardEditorProps<LiveChannelAddress, any, LivePanelOptions>;
@ -25,7 +23,6 @@ const scopes: Array<SelectableValue<LiveChannelScope>> = [
interface State { interface State {
namespaces: Array<SelectableValue<string>>; namespaces: Array<SelectableValue<string>>;
paths: Array<SelectableValue<string>>; paths: Array<SelectableValue<string>>;
support?: LiveChannelSupport;
} }
export class LiveChannelEditor extends PureComponent<Props, State> { export class LiveChannelEditor extends PureComponent<Props, State> {
@ -44,31 +41,9 @@ export class LiveChannelEditor extends PureComponent<Props, State> {
} }
} }
async getScopeDetails() {
const { scope, namespace } = this.props.value;
const srv = getGrafanaLiveScopes();
if (!srv.doesScopeExist(scope)) {
return {
namespaces: [],
support: undefined,
};
}
const namespaces = await srv.getNamespaces(scope);
const support = namespace ? await srv.getChannelSupport(scope, namespace) : undefined;
return {
namespaces,
support,
};
}
async updateSelectOptions() { async updateSelectOptions() {
const { namespaces, support } = await this.getScopeDetails();
this.setState({ this.setState({
namespaces, namespaces: [],
support,
paths: [], paths: [],
}); });
} }