Merge remote-tracking branch 'origin/master' into fix/system-overview-page-reduce-error

This commit is contained in:
Dennis Oelkers
2026-03-12 16:39:03 +01:00
5 changed files with 276 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
type = "a"
message = "Add support for handling input failures received from the forwarder."
pulls = ["25025"]

View File

@@ -218,6 +218,15 @@ public class FailureSubmissionService {
}
}
public void submitInputFailure(InputFailure inputFailure) {
try {
failureSubmissionQueue.submitBlocking(FailureBatch.inputFailureBatch(List.of(inputFailure)));
} catch (InterruptedException ignored) {
logger.warn("Failed to submit an input failure for failure handling. The thread has been interrupted!");
Thread.currentThread().interrupt();
}
}
private IndexingFailure fromIndexingError(IndexingError indexingError) {
return new IndexingFailure(
indexingError.error().type() == MappingError ?

View File

@@ -25,6 +25,7 @@ import org.joda.time.DateTime;
import java.util.Optional;
import static org.graylog2.plugin.Message.FIELD_GL2_FORWARDER_INPUT;
import static org.graylog2.plugin.Message.FIELD_GL2_SOURCE_INPUT;
import static org.graylog2.plugin.Message.FIELD_GL2_SOURCE_NODE;
import static org.graylog2.plugin.Message.FIELD_SOURCE;
@@ -37,6 +38,8 @@ public class InputFailure implements Failure {
private final DateTime failureTimestamp;
private final RawMessage rawMessage;
private final String originalMessage;
@Nullable
private String forwarderInputId;
public InputFailure(@Nonnull FailureCause failureCause,
@Nonnull String failureMessage,
@@ -108,6 +111,7 @@ public class InputFailure implements Failure {
.put(FIELD_GL2_SOURCE_INPUT, sourceNode.inputId)
.put(FIELD_GL2_SOURCE_NODE, sourceNode.nodeId)
);
builder.put(FIELD_GL2_FORWARDER_INPUT, forwarderInputId);
Optional.ofNullable(rawMessage.getRemoteAddress()).ifPresent(address ->
builder.put(FIELD_SOURCE, address.toString()));
@@ -123,4 +127,17 @@ public class InputFailure implements Failure {
public Object getMessageQueueId() {
return null;
}
public RawMessage getRawMessage() {
return rawMessage;
}
@Nullable
public String getForwarderInputId() {
return forwarderInputId;
}
public void setForwarderInputId(@Nullable String forwarderInputId) {
this.forwarderInputId = forwarderInputId;
}
}

View File

@@ -19,6 +19,7 @@ package org.graylog.failure;
import org.graylog2.indexer.messages.IndexingError;
import org.graylog2.inputs.diagnosis.InputDiagnosisMetrics;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.inputs.failure.InputProcessingException;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
@@ -34,6 +35,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.graylog.failure.InputFailureCause.INPUT_PARSE;
import static org.graylog2.indexer.messages.IndexingError.Type.MappingError;
import static org.graylog2.indexer.messages.IndexingError.Type.Unknown;
import static org.mockito.Mockito.lenient;
@@ -416,4 +418,38 @@ class FailureSubmissionServiceTest {
assertThat(fb.getFailures().get(0)).satisfies(indexingFailure ->
assertThat(indexingFailure.failureDetails()).isEqualTo("IllegalArgumentException: rootCauseMessage")));
}
@Test
void submitInputFailure() throws Exception {
final String failureMessage = "failure-message";
final String failureDetails = "failure-details";
InputFailure inputFailure = new InputFailure(
INPUT_PARSE,
failureMessage,
failureDetails,
Tools.nowUTC(),
new RawMessage(new byte[]{1, 2, 3}),
"original-payload"
);
underTest.submitInputFailure(inputFailure);
verify(failureSubmissionQueue, times(1)).submitBlocking(failureBatchCaptor.capture());
assertThat(failureBatchCaptor.getValue()).satisfies(batch -> {
assertThat(batch.containsInputFailures()).isTrue();
assertThat(batch.size()).isEqualTo(1);
assertThat(batch.getFailures().getFirst())
.isSameAs(inputFailure)
.satisfies(failure -> {
assertThat(failure.failureType()).isEqualTo(FailureType.INPUT);
assertThat(failure.failureCause().label()).isEqualTo(INPUT_PARSE.label());
assertThat(failure.message()).isEqualTo(failureMessage);
assertThat(failure.failureDetails()).isEqualTo(failureDetails);
assertThat(failure.failureTimestamp()).isNotNull();
assertThat(failure.requiresAcknowledgement()).isFalse();
});
});
}
}

View File

@@ -0,0 +1,210 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.shared.buffers.processors;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.graylog.failure.FailureSubmissionService;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.TestMessageFactory;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.inputs.failure.InputProcessingException;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.plugin.system.SimpleNodeId;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class DecodingProcessorTest {
private static final String CODEC_NAME = "test-codec";
private static final String INPUT_ID = "input-id";
private static final String NODE_ID = "node-id";
@Mock
private Codec codec;
@Mock
private Codec.Factory<Codec> codecFactory;
@Mock
private ServerStatus serverStatus;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private MetricRegistry metricRegistry;
@Mock
private MessageQueueAcknowledger acknowledger;
@Mock
private FailureSubmissionService failureSubmissionService;
@Captor
private ArgumentCaptor<InputProcessingException> exceptionCaptor;
private DecodingProcessor processor;
@BeforeEach
void setUp() {
Timer decodeTimer = new Timer();
Timer parseTimer = new Timer();
processor = new DecodingProcessor(
Map.of(CODEC_NAME, codecFactory),
serverStatus,
metricRegistry,
acknowledger,
failureSubmissionService,
decodeTimer,
parseTimer
);
}
private void setUpCodecFactory() {
when(codecFactory.create(any(Configuration.class))).thenReturn(codec);
}
@Test
void validMessageIsDecodedAndSetOnEvent() throws Exception {
setUpCodecFactory();
when(serverStatus.getDetailedMessageRecordingStrategy())
.thenReturn(ServerStatus.MessageDetailRecordingStrategy.NEVER);
Message decoded = new TestMessageFactory().createMessage("message", "source", Tools.nowUTC());
when(codec.decodeSafe(any(RawMessage.class))).thenReturn(Optional.of(decoded));
MessageEvent event = createEvent("valid-payload");
assertThat(event.getMessage()).isNull();
processor.onEvent(event, 0, true);
verify(failureSubmissionService, never()).submitInputFailure(any(), any());
assertThat(event.getMessage()).isNotNull();
}
@Test
void inputProcessingExceptionIsSubmittedAsFailure() throws Exception {
setUpCodecFactory();
final String errorMessage = "GELF message is missing mandatory 'short_message' field.";
final String payload = "invalid-gelf";
RawMessage rawMessage = createRawMessage(payload);
InputProcessingException exception = InputProcessingException.create(
errorMessage,
new IllegalArgumentException("Missing field: short_message"),
rawMessage,
payload
);
when(codec.decodeSafe(any(RawMessage.class))).thenThrow(exception);
MessageEvent event = new MessageEvent();
event.setRaw(rawMessage);
processor.onEvent(event, 0, true);
verify(failureSubmissionService).submitInputFailure(exceptionCaptor.capture(), eq(INPUT_ID));
InputProcessingException captured = exceptionCaptor.getValue();
assertThat(captured.getMessage()).isEqualTo(errorMessage);
assertThat(captured.getRawMessage()).isSameAs(rawMessage);
assertThat(captured.getRawMessage().getPayload()).isEqualTo(payload.getBytes(StandardCharsets.UTF_8));
// Message is acknowledged twice (in the catch and finally block)
verify(acknowledger, times(2)).acknowledge(rawMessage.getMessageQueueId());
}
@Test
void runtimeExceptionIsSubmittedAsFailure() throws Exception {
setUpCodecFactory();
RawMessage rawMessage = createRawMessage("payload");
final String exceptionMessage = "Unable to decode raw message due to an unexpected error.";
final String exceptionCause = "unexpected codec error.";
when(codec.decodeSafe(any(RawMessage.class)))
.thenThrow(new RuntimeException(exceptionCause));
MessageEvent event = new MessageEvent();
event.setRaw(rawMessage);
processor.onEvent(event, 0, true);
verify(failureSubmissionService).submitInputFailure(exceptionCaptor.capture(), eq(INPUT_ID));
InputProcessingException captured = exceptionCaptor.getValue();
assertThat(captured.getMessage()).isEqualTo(exceptionMessage);
assertThat(captured.getCause()).isInstanceOf(RuntimeException.class);
assertThat(captured.getCause().getMessage()).isEqualTo(exceptionCause);
// Message is acknowledged twice (in the catch and finally block)
verify(acknowledger, times(2)).acknowledge(rawMessage.getMessageQueueId());
}
@Test
void missingCodecFactorySkipsMessageWithoutFailure() throws Exception {
RawMessage rawMessage = createRawMessage("payload");
rawMessage.setCodecName("unknown-codec");
MessageEvent event = new MessageEvent();
event.setRaw(rawMessage);
processor.onEvent(event, 0, true);
// Message is skipped when no codec factory is found
verify(failureSubmissionService, never()).submitInputFailure(any(), any());
verify(acknowledger).acknowledge(rawMessage.getMessageQueueId());
}
private MessageEvent createEvent(String payload) {
MessageEvent event = new MessageEvent();
event.setRaw(createRawMessage(payload));
return event;
}
private RawMessage createRawMessage(String payload) {
RawMessage raw = new RawMessage(payload.getBytes(StandardCharsets.UTF_8));
raw.setCodecName(CODEC_NAME);
raw.setCodecConfig(Configuration.EMPTY_CONFIGURATION);
raw.addSourceNode(INPUT_ID, new SimpleNodeId(NODE_ID));
return raw;
}
}