BackendSrv: Support streaming chunked responses (#98691)

This commit is contained in:
Ryan McKinley
2025-01-15 10:01:22 +03:00
committed by GitHub
parent 0fce8799eb
commit 0d302a161a
9 changed files with 256 additions and 19 deletions

View File

@ -70,6 +70,12 @@ export type BackendSrvRequest = {
*/
responseType?: 'json' | 'text' | 'arraybuffer' | 'blob';
/**
* Used to cancel an open connection
* https://developer.mozilla.org/en-US/docs/Web/API/AbortController
*/
abortSignal?: AbortSignal;
/**
* The credentials read-only property of the Request interface indicates whether the user agent should send cookies from the other domain in the case of cross-origin requests.
*/
@ -173,6 +179,14 @@ export interface BackendSrv {
* Observable http request interface
*/
fetch<T>(options: BackendSrvRequest): Observable<FetchResponse<T>>;
/**
* Observe each raw chunk in the response. This is useful when reading values from
* a long living HTTP connection like the kubernetes WATCH command.
*
* Each chunk includes the full response headers and the `data` property is filled with the chunk.
*/
chunked(options: BackendSrvRequest): Observable<FetchResponse<Uint8Array | undefined>>;
}
let singletonInstance: BackendSrv;

View File

@ -4,13 +4,13 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"sort"
"strconv"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana/pkg/tsdb/grafana-testdata-datasource/kinds"
)
@ -78,34 +78,89 @@ func (s *Service) testStreamHandler(rw http.ResponseWriter, req *http.Request) {
ctxLogger := s.logger.FromContext(req.Context())
ctxLogger.Debug("Received resource call", "url", req.URL.String(), "method", req.Method)
header := rw.Header()
header.Set("Cache-Control", "no-store")
header.Set("X-Content-Type-Options", "nosniff")
header.Set("Content-Type", "text/plain")
writeError := func(code int, message string) {
rw.WriteHeader(code)
_, _ = rw.Write([]byte(message))
}
if req.Method != http.MethodGet {
writeError(http.StatusMethodNotAllowed, "only supports get")
return
}
var err error
query := req.URL.Query()
count := 10
countstr := req.URL.Query().Get("count")
if countstr != "" {
if i, err := strconv.Atoi(countstr); err == nil {
count = i
if query.Has("count") {
count, err = strconv.Atoi(query.Get("count"))
if err != nil {
writeError(http.StatusBadRequest, "invalid count value")
return
}
}
sleep := req.URL.Query().Get("sleep")
sleepDuration, err := time.ParseDuration(sleep)
if err != nil {
sleepDuration = time.Millisecond
start := 1
if query.Has("start") {
start, err = strconv.Atoi(query.Get("start"))
if err != nil {
writeError(http.StatusBadRequest, "invalid start value")
return
}
}
rw.Header().Set("Content-Type", "text/plain")
flush := 100 // flush 100% of the time
if query.Has("flush") {
flush, err = strconv.Atoi(query.Get("flush"))
if err != nil {
writeError(http.StatusBadRequest, "invalid flush value")
return
}
if flush > 100 || flush < 0 {
writeError(http.StatusBadRequest, "expecting flush between 0-100")
return
}
}
speed := time.Millisecond * 10
if query.Has("speed") {
speed, err = time.ParseDuration(query.Get("speed"))
if err != nil {
writeError(http.StatusBadRequest, "invalid speed")
return
}
}
line := func(i int) string {
return fmt.Sprintf("Message #%d", i)
}
switch query.Get("format") {
case "json":
line = func(i int) string {
return fmt.Sprintf(`{"message": %d, "value": %.3f, "time": %d}`, i, rand.Float64(), time.Now().UnixMilli())
}
case "influx":
line = func(i int) string {
val := rand.Float64()
return fmt.Sprintf("measurement,tag1=value1,tag2=value2 message=%d,value=%.3f %d", i, val, time.Now().UnixMilli())
}
}
rw.WriteHeader(http.StatusOK)
for i := 1; i <= count; i++ {
if _, err := io.WriteString(rw, fmt.Sprintf("Message #%d", i)); err != nil {
for i := start; i <= count; i++ {
if _, err := io.WriteString(rw, line(i)+"\n"); err != nil {
ctxLogger.Error("Failed to write response", "error", err)
return
}
rw.(http.Flusher).Flush()
time.Sleep(sleepDuration)
// This may send multiple lines in one chunk
if flush > rand.Intn(100) {
rw.(http.Flusher).Flush()
}
time.Sleep(speed)
}
}

View File

@ -142,6 +142,83 @@ export class BackendSrv implements BackendService {
});
}
chunkRequestId = 1;
chunked(options: BackendSrvRequest): Observable<FetchResponse<Uint8Array | undefined>> {
const requestId = options.requestId ?? `chunked-${this.chunkRequestId++}`;
const controller = new AbortController();
const url = parseUrlFromOptions(options);
const init = parseInitFromOptions({
...options,
requestId,
abortSignal: controller.signal,
});
return new Observable((observer) => {
let done = false;
// Calling fromFetch explicitly avoids the request queue
const sub = this.dependencies.fromFetch(url, init).subscribe({
next: (response) => {
const rsp = {
status: response.status,
statusText: response.statusText,
ok: response.ok,
headers: response.headers,
url: response.url,
type: response.type,
redirected: response.redirected,
config: options,
traceId: response.headers.get(GRAFANA_TRACEID_HEADER) ?? undefined,
data: undefined,
};
if (!response.body) {
observer.next(rsp);
observer.complete();
return;
}
const reader = response.body.getReader();
async function process() {
while (reader && !done) {
if (controller.signal.aborted) {
reader.cancel(controller.signal.reason);
console.log(requestId, 'signal.aborted');
return;
}
const chunk = await reader.read();
observer.next({
...rsp,
data: chunk.value,
});
if (chunk.done) {
done = true;
console.log(requestId, 'done');
}
}
}
process()
.catch((e) => {
console.log(requestId, 'catch', e);
}) // from abort
.then(() => {
console.log(requestId, 'complete');
observer.complete();
}); // runs in background
},
error: (e) => {
observer.error(e);
},
});
return function unsubscribe() {
console.log(requestId, 'unsubscribe');
controller.abort('unsubscribe');
sub.unsubscribe();
};
});
}
private internalFetch<T>(options: BackendSrvRequest): Observable<FetchResponse<T>> {
if (options.requestId) {
this.inFlightRequests.next(options.requestId);

View File

@ -15,6 +15,7 @@ export const parseInitFromOptions = (options: BackendSrvRequest): RequestInit =>
headers,
body,
credentials,
signal: options.abortSignal,
};
};
@ -135,7 +136,6 @@ export async function parseResponseBody<T>(
console.warn(`${response.url} returned an invalid JSON`);
return {} as T;
}
return await response.json();
case 'text':

View File

@ -11,7 +11,10 @@ export function createFetchCorrelationsResponse<T>(overrides?: DeepPartial<Fetch
data: undefined,
status: 200,
url: '',
config: { url: '' },
config: {
url: '',
abortSignal: undefined,
},
type: 'basic',
statusText: 'Ok',
redirected: false,

View File

@ -76,6 +76,7 @@ export function setupExplore(options?: SetupOptions): {
setBackendSrv({
datasourceRequest: jest.fn().mockRejectedValue(undefined),
delete: jest.fn().mockRejectedValue(undefined),
chunked: jest.fn().mockRejectedValue(undefined),
fetch: jest.fn().mockImplementation((req) => {
let data: Record<string, string | object | number> = {};
if (req.url.startsWith('/api/datasources/correlations') && req.method === 'GET') {

View File

@ -17,6 +17,7 @@ const types = [
{ value: 'logs', label: 'Logs' },
{ value: 'fetch', label: 'Fetch' },
{ value: 'traces', label: 'Traces' },
{ value: 'watch', label: 'Watch' },
];
export const StreamingClientEditor = ({ onChange, query }: EditorProps) => {
@ -34,7 +35,7 @@ export const StreamingClientEditor = ({ onChange, query }: EditorProps) => {
const fields =
streamType === 'signal'
? streamingClientFields
: ['logs', 'traces'].includes(streamType)
: ['logs', 'traces', 'watch'].includes(streamType)
? [streamingClientFields[0]] // speed
: [];

View File

@ -41,7 +41,7 @@ export interface StreamingQuery {
noise: number;
speed: number;
spread: number;
type: 'signal' | 'logs' | 'fetch' | 'traces';
type: 'signal' | 'logs' | 'fetch' | 'traces' | 'watch';
url?: string;
}

View File

@ -1,5 +1,5 @@
import { defaults } from 'lodash';
import { Observable } from 'rxjs';
import { Observable, throwError } from 'rxjs';
import { v4 as uuidv4 } from 'uuid';
import {
@ -18,6 +18,7 @@ import {
getDisplayProcessor,
createTheme,
} from '@grafana/data';
import { getBackendSrv } from '@grafana/runtime';
import { getRandomLine } from './LogIpsum';
import { TestDataDataQuery, StreamingQuery } from './dataquery';
@ -44,6 +45,8 @@ export function runStream(
return runFetchStream(target, query, req);
case 'traces':
return runTracesStream(target, query, req);
case 'watch':
return runWatchStream(target, query, req);
}
throw new Error(`Unknown Stream Type: ${query.type}`);
}
@ -176,6 +179,89 @@ export function runLogsStream(
});
}
interface StreamMessage {
message: number; // incrementing number
time: number;
value: number;
}
export function runWatchStream(
target: TestDataDataQuery,
query: StreamingQuery,
req: DataQueryRequest<TestDataDataQuery>
): Observable<DataQueryResponse> {
const uid = req.targets[0].datasource?.uid;
if (!uid) {
return throwError(() => new Error('expected datasource uid'));
}
return new Observable<DataQueryResponse>((subscriber) => {
const streamId = `watch-${req.panelId || 'explore'}-${target.refId}`;
const data = new CircularDataFrame({
append: 'tail',
capacity: req.maxDataPoints || 1000,
});
data.refId = target.refId;
data.name = target.alias || 'Logs ' + target.refId;
data.addField({ name: 'time', type: FieldType.time });
data.addField({ name: 'message', type: FieldType.number });
data.addField({ name: 'value', type: FieldType.number });
const decoder = new TextDecoder();
const sub = getBackendSrv()
.chunked({
url: `api/datasources/uid/${uid}/resources/stream`,
params: {
count: req.maxDataPoints || 1000, // connection will close when done
format: 'json',
speed: `${query.speed ?? 250}ms`,
flush: 85, // 85% (eg, sometimes send a few at a time)
},
})
.subscribe({
next: (chunk) => {
if (!chunk.data || !chunk.ok) {
console.info('chunk missing data', chunk);
return;
}
decoder
.decode(chunk.data, { stream: true })
.split('\n')
.forEach((line) => {
if (line?.length) {
try {
const msg: StreamMessage = JSON.parse(line);
data.fields[0].values.push(msg.time);
data.fields[1].values.push(msg.message);
data.fields[2].values.push(msg.value);
subscriber.next({
data: [data],
key: streamId,
state: LoadingState.Streaming,
});
} catch (err) {
console.warn('error parsing line', line, err);
}
}
});
},
error: (err) => {
console.warn('error in stream', streamId, err);
},
complete: () => {
console.info('complete stream', streamId);
},
});
return () => {
console.log('unsubscribing to stream', streamId);
sub.unsubscribe();
};
});
}
export function runFetchStream(
target: TestDataDataQuery,
query: StreamingQuery,