Pipelines new column: Processing Errors (#25003)

* Pipelines new column Processing Errors

* Update graylog2-web-interface/src/components/pipelines/PipelineListItem.tsx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Laura Bergenthal-Grotlüschen <197286649+laura-b-g@users.noreply.github.com>
This commit is contained in:
Mohamed OULD HOCINE
2026-02-23 16:08:54 +01:00
committed by GitHub
parent ad75e046e3
commit 8b9d6d1ef8
4 changed files with 273 additions and 2 deletions

View File

@@ -30,6 +30,8 @@ import useGetPermissionsByScope from 'hooks/useScopePermissions';
import RuleDeprecationInfo from 'components/rules/RuleDeprecationInfo';
import usePermissions from 'hooks/usePermissions';
import PipelineProcessingErrors from './PipelineProcessingErrors';
import ButtonToolbar from '../bootstrap/ButtonToolbar';
import { Spinner } from '../common';
@@ -141,15 +143,19 @@ const PipelineListItem = ({ pipeline, pipelines, connections, streams, onDeleteP
noConnectionsMessage={<em>Not connected</em>}
/>
</StreamListTD>
<td>
<PipelineProcessingErrors pipeline={pipeline} />
</td>
<td>{_formatStages()}</td>
<td>
<ButtonToolbar>
<LinkContainer to={Routes.SYSTEM.PIPELINES.PIPELINE(id)}>
<LinkContainer to={Routes.SYSTEM.PIPELINES.PIPELINE(id)} aria-label="Edit Pipeline">
<Button disabled={!isPermitted('pipeline:edit')} bsSize="xsmall">
Edit
</Button>
</LinkContainer>
<Button
aria-label="Delete Pipeline"
disabled={!isPermitted('pipeline:delete') || isNotDeletable}
bsStyle="danger"
bsSize="xsmall"

View File

@@ -0,0 +1,144 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
import * as React from 'react';
import { render, screen } from 'wrappedTestingLibrary';
import type { PipelineType } from 'components/pipelines/types';
import usePipelineRulesMetadata from 'components/rules/hooks/usePipelineRulesMetadata';
import { useStore } from 'stores/connect';
import { MetricsActions } from 'stores/metrics/MetricsStore';
import PipelineProcessingErrors, { getPipelineRuleFailureMetricNames } from './PipelineProcessingErrors';
jest.mock('components/rules/hooks/usePipelineRulesMetadata');
jest.mock('stores/connect', () => ({
__esModule: true,
useStore: jest.fn(),
}));
jest.mock('stores/metrics/MetricsStore', () => ({
MetricsStore: {},
MetricsActions: {
addGlobal: jest.fn(),
removeGlobal: jest.fn(),
},
}));
jest.mock('components/metrics', () => ({
CounterRate: ({ metric }: { metric: { count: number } }) => (
<span data-testid="pipeline-processing-errors-rate">{metric?.count} errors/s</span>
),
}));
describe('PipelineProcessingErrors', () => {
const pipeline: PipelineType = {
id: 'pipeline-1',
title: 'Test Pipeline',
description: 'Test pipeline description',
source: '',
created_at: '2024-01-01T00:00:00.000Z',
modified_at: '2024-01-01T00:00:00.000Z',
stages: [
{ stage: 0, match: 'ALL', rules: ['rule-1-title'] },
{ stage: 2, match: 'EITHER', rules: ['rule-2-title'] },
],
errors: null,
has_deprecated_functions: false,
_scope: 'DEFAULT',
};
const mockPipelineRulesMetadata = {
functions: [],
streams: [],
rules: ['rule-1', 'rule-2'],
deprecated_functions: [],
};
beforeEach(() => {
(usePipelineRulesMetadata as jest.Mock).mockReturnValue({
data: mockPipelineRulesMetadata,
isLoading: false,
refetch: jest.fn(),
});
(useStore as jest.Mock).mockReturnValue({
metrics: {
node1: {
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-1.pipeline-1.0.failed': {
type: 'meter',
metric: { rate: { total: 2 } },
},
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-2.pipeline-1.2.failed': {
type: 'meter',
metric: { rate: { total: 3 } },
},
},
node2: {
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-1.pipeline-1.0.failed': {
type: 'meter',
metric: { rate: { total: 5 } },
},
},
},
});
jest.clearAllMocks();
});
it('builds unique rule failure metric names per stage', () => {
const names = getPipelineRuleFailureMetricNames(
{
...pipeline,
stages: [
{ stage: 0, match: 'ALL', rules: [] },
{ stage: 0, match: 'EITHER', rules: [] },
{ stage: 1, match: 'PASS', rules: [] },
],
},
['rule-1', 'rule-1'],
);
expect(names).toEqual([
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-1.pipeline-1.0.failed',
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-1.pipeline-1.1.failed',
]);
});
it('registers metrics and renders total failures across nodes', () => {
const { unmount } = render(<PipelineProcessingErrors pipeline={pipeline} />);
const expectedMetricNames = [
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-1.pipeline-1.0.failed',
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-1.pipeline-1.2.failed',
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-2.pipeline-1.0.failed',
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-2.pipeline-1.2.failed',
];
expect(MetricsActions.addGlobal).toHaveBeenCalledTimes(expectedMetricNames.length);
expectedMetricNames.forEach((name) => {
expect(MetricsActions.addGlobal).toHaveBeenCalledWith(name);
});
expect(screen.getByTestId('pipeline-processing-errors-rate')).toHaveTextContent('10 errors/s');
expect(screen.getByText('(10 total)')).toBeInTheDocument();
unmount();
expect(MetricsActions.removeGlobal).toHaveBeenCalledTimes(expectedMetricNames.length);
expectedMetricNames.forEach((name) => {
expect(MetricsActions.removeGlobal).toHaveBeenCalledWith(name);
});
});
});

View File

@@ -0,0 +1,121 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
import * as React from 'react';
import { useEffect, useMemo } from 'react';
import numeral from 'numeral';
import { CounterRate } from 'components/metrics';
import usePipelineRulesMetadata from 'components/rules/hooks/usePipelineRulesMetadata';
import type { PipelineType } from 'components/pipelines/types';
import { useStore } from 'stores/connect';
import type { ClusterMetric, Metric, NodeMetric } from 'stores/metrics/MetricsStore';
import { MetricsActions, MetricsStore } from 'stores/metrics/MetricsStore';
type Props = {
pipeline: PipelineType;
};
const INITIAL_METRIC = {
full_name: '',
count: 0,
};
const METRIC_PREFIX = 'org.graylog.plugins.pipelineprocessor.ast.Rule';
const METRIC_SUFFIX = 'failed';
const metricName = (ruleId: string, pipelineId: string, stage: number) =>
`${METRIC_PREFIX}.${ruleId}.${pipelineId}.${stage}.${METRIC_SUFFIX}`;
const metricCount = (metric?: Metric): number => {
if (!metric) {
return 0;
}
switch (metric.type) {
case 'counter':
return metric.metric.count;
case 'gauge':
return metric.metric.value;
case 'histogram':
return metric.metric.count;
case 'meter':
return metric.metric.rate.total;
case 'timer':
return metric.metric.rate.total;
default:
return 0;
}
};
export const getPipelineRuleFailureMetricNames = (pipeline: PipelineType, ruleIds: string[]): Array<string> => {
const stages = Array.from(new Set(pipeline.stages.map(({ stage }) => stage)));
const uniqueRuleIds = Array.from(new Set(ruleIds));
return uniqueRuleIds.flatMap((ruleId) => stages.map((stage) => metricName(ruleId, pipeline.id, stage)));
};
const PipelineProcessingErrors = ({ pipeline }: Props) => {
const { data: pipelineRulesMetadata } = usePipelineRulesMetadata(pipeline.id, {
enabled: !!pipeline.id,
});
const { metrics }: { metrics: ClusterMetric } = useStore(MetricsStore, (state) => ({
metrics: state.metrics ?? {},
}));
const metricNames = useMemo(
() => getPipelineRuleFailureMetricNames(pipeline, pipelineRulesMetadata?.rules ?? []),
[pipeline, pipelineRulesMetadata?.rules],
);
useEffect(() => {
metricNames.forEach((name) => MetricsActions.addGlobal(name));
return () => {
metricNames.forEach((name) => MetricsActions.removeGlobal(name));
};
}, [metricNames]);
const totalErrors = useMemo(
() =>
Object.values(metrics ?? {}).reduce((clusterTotal: number, nodeMetrics: NodeMetric) => {
const nodeTotal = metricNames.reduce(
(sum: number, currentMetricName: string) => sum + metricCount(nodeMetrics[currentMetricName]),
0,
);
return clusterTotal + nodeTotal;
}, 0),
[metricNames, metrics],
);
return (
<span>
<CounterRate
metric={{
...INITIAL_METRIC,
full_name: `pipeline.${pipeline.id}.failed`,
count: totalErrors,
}}
suffix="errors/s"
/>
<br />
<span className="number-format">({numeral(totalErrors).format('0')} total)</span>
</span>
);
};
export default PipelineProcessingErrors;

View File

@@ -126,7 +126,7 @@ const ProcessingTimelineComponent = () => {
onDeletePipeline={() => _deletePipeline(pipelineItem)}
/>
);
const headers = ['Pipeline', 'Connected to Streams', 'Processing Timeline', 'Actions'];
const headers = ['Pipeline', 'Connected to Streams', 'Processing Errors', 'Processing Timeline', 'Actions'];
return (
<div>