diff --git a/changelog/unreleased/pr-25025.toml b/changelog/unreleased/pr-25025.toml new file mode 100644 index 0000000000..a2ee600491 --- /dev/null +++ b/changelog/unreleased/pr-25025.toml @@ -0,0 +1,4 @@ +type = "a" +message = "Add support for handling input failures received from the forwarder." + +pulls = ["25025"] diff --git a/graylog2-server/src/main/java/org/graylog/failure/FailureSubmissionService.java b/graylog2-server/src/main/java/org/graylog/failure/FailureSubmissionService.java index 4dab7d54b1..3231d11d25 100644 --- a/graylog2-server/src/main/java/org/graylog/failure/FailureSubmissionService.java +++ b/graylog2-server/src/main/java/org/graylog/failure/FailureSubmissionService.java @@ -218,6 +218,15 @@ public class FailureSubmissionService { } } + public void submitInputFailure(InputFailure inputFailure) { + try { + failureSubmissionQueue.submitBlocking(FailureBatch.inputFailureBatch(List.of(inputFailure))); + } catch (InterruptedException ignored) { + logger.warn("Failed to submit an input failure for failure handling. The thread has been interrupted!"); + Thread.currentThread().interrupt(); + } + } + private IndexingFailure fromIndexingError(IndexingError indexingError) { return new IndexingFailure( indexingError.error().type() == MappingError ? diff --git a/graylog2-server/src/main/java/org/graylog/failure/InputFailure.java b/graylog2-server/src/main/java/org/graylog/failure/InputFailure.java index dd4541cc9c..8dd1df70c8 100644 --- a/graylog2-server/src/main/java/org/graylog/failure/InputFailure.java +++ b/graylog2-server/src/main/java/org/graylog/failure/InputFailure.java @@ -25,6 +25,7 @@ import org.joda.time.DateTime; import java.util.Optional; +import static org.graylog2.plugin.Message.FIELD_GL2_FORWARDER_INPUT; import static org.graylog2.plugin.Message.FIELD_GL2_SOURCE_INPUT; import static org.graylog2.plugin.Message.FIELD_GL2_SOURCE_NODE; import static org.graylog2.plugin.Message.FIELD_SOURCE; @@ -37,6 +38,8 @@ public class InputFailure implements Failure { private final DateTime failureTimestamp; private final RawMessage rawMessage; private final String originalMessage; + @Nullable + private String forwarderInputId; public InputFailure(@Nonnull FailureCause failureCause, @Nonnull String failureMessage, @@ -108,6 +111,7 @@ public class InputFailure implements Failure { .put(FIELD_GL2_SOURCE_INPUT, sourceNode.inputId) .put(FIELD_GL2_SOURCE_NODE, sourceNode.nodeId) ); + builder.put(FIELD_GL2_FORWARDER_INPUT, forwarderInputId); Optional.ofNullable(rawMessage.getRemoteAddress()).ifPresent(address -> builder.put(FIELD_SOURCE, address.toString())); @@ -123,4 +127,17 @@ public class InputFailure implements Failure { public Object getMessageQueueId() { return null; } + + public RawMessage getRawMessage() { + return rawMessage; + } + + @Nullable + public String getForwarderInputId() { + return forwarderInputId; + } + + public void setForwarderInputId(@Nullable String forwarderInputId) { + this.forwarderInputId = forwarderInputId; + } } diff --git a/graylog2-server/src/test/java/org/graylog/failure/FailureSubmissionServiceTest.java b/graylog2-server/src/test/java/org/graylog/failure/FailureSubmissionServiceTest.java index c03951afb5..e3aa8db87b 100644 --- a/graylog2-server/src/test/java/org/graylog/failure/FailureSubmissionServiceTest.java +++ b/graylog2-server/src/test/java/org/graylog/failure/FailureSubmissionServiceTest.java @@ -19,6 +19,7 @@ package org.graylog.failure; import org.graylog2.indexer.messages.IndexingError; import org.graylog2.inputs.diagnosis.InputDiagnosisMetrics; import org.graylog2.plugin.Message; +import org.graylog2.plugin.Tools; import org.graylog2.plugin.inputs.failure.InputProcessingException; import org.graylog2.plugin.journal.RawMessage; import org.graylog2.shared.bindings.providers.ObjectMapperProvider; @@ -34,6 +35,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; +import static org.graylog.failure.InputFailureCause.INPUT_PARSE; import static org.graylog2.indexer.messages.IndexingError.Type.MappingError; import static org.graylog2.indexer.messages.IndexingError.Type.Unknown; import static org.mockito.Mockito.lenient; @@ -416,4 +418,38 @@ class FailureSubmissionServiceTest { assertThat(fb.getFailures().get(0)).satisfies(indexingFailure -> assertThat(indexingFailure.failureDetails()).isEqualTo("IllegalArgumentException: rootCauseMessage"))); } + + @Test + void submitInputFailure() throws Exception { + final String failureMessage = "failure-message"; + final String failureDetails = "failure-details"; + + InputFailure inputFailure = new InputFailure( + INPUT_PARSE, + failureMessage, + failureDetails, + Tools.nowUTC(), + new RawMessage(new byte[]{1, 2, 3}), + "original-payload" + ); + + underTest.submitInputFailure(inputFailure); + + verify(failureSubmissionQueue, times(1)).submitBlocking(failureBatchCaptor.capture()); + assertThat(failureBatchCaptor.getValue()).satisfies(batch -> { + assertThat(batch.containsInputFailures()).isTrue(); + assertThat(batch.size()).isEqualTo(1); + + assertThat(batch.getFailures().getFirst()) + .isSameAs(inputFailure) + .satisfies(failure -> { + assertThat(failure.failureType()).isEqualTo(FailureType.INPUT); + assertThat(failure.failureCause().label()).isEqualTo(INPUT_PARSE.label()); + assertThat(failure.message()).isEqualTo(failureMessage); + assertThat(failure.failureDetails()).isEqualTo(failureDetails); + assertThat(failure.failureTimestamp()).isNotNull(); + assertThat(failure.requiresAcknowledgement()).isFalse(); + }); + }); + } } diff --git a/graylog2-server/src/test/java/org/graylog2/shared/buffers/processors/DecodingProcessorTest.java b/graylog2-server/src/test/java/org/graylog2/shared/buffers/processors/DecodingProcessorTest.java new file mode 100644 index 0000000000..f0690b4206 --- /dev/null +++ b/graylog2-server/src/test/java/org/graylog2/shared/buffers/processors/DecodingProcessorTest.java @@ -0,0 +1,210 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.shared.buffers.processors; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import org.graylog.failure.FailureSubmissionService; +import org.graylog2.plugin.Message; +import org.graylog2.plugin.ServerStatus; +import org.graylog2.plugin.TestMessageFactory; +import org.graylog2.plugin.Tools; +import org.graylog2.plugin.buffers.MessageEvent; +import org.graylog2.plugin.configuration.Configuration; +import org.graylog2.plugin.inputs.codecs.Codec; +import org.graylog2.plugin.inputs.failure.InputProcessingException; +import org.graylog2.plugin.journal.RawMessage; +import org.graylog2.plugin.system.SimpleNodeId; +import org.graylog2.shared.messageq.MessageQueueAcknowledger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class DecodingProcessorTest { + + private static final String CODEC_NAME = "test-codec"; + private static final String INPUT_ID = "input-id"; + private static final String NODE_ID = "node-id"; + + @Mock + private Codec codec; + + @Mock + private Codec.Factory codecFactory; + + @Mock + private ServerStatus serverStatus; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MetricRegistry metricRegistry; + + @Mock + private MessageQueueAcknowledger acknowledger; + + @Mock + private FailureSubmissionService failureSubmissionService; + + @Captor + private ArgumentCaptor exceptionCaptor; + + private DecodingProcessor processor; + + @BeforeEach + void setUp() { + Timer decodeTimer = new Timer(); + Timer parseTimer = new Timer(); + + processor = new DecodingProcessor( + Map.of(CODEC_NAME, codecFactory), + serverStatus, + metricRegistry, + acknowledger, + failureSubmissionService, + decodeTimer, + parseTimer + ); + } + + private void setUpCodecFactory() { + when(codecFactory.create(any(Configuration.class))).thenReturn(codec); + } + + @Test + void validMessageIsDecodedAndSetOnEvent() throws Exception { + setUpCodecFactory(); + when(serverStatus.getDetailedMessageRecordingStrategy()) + .thenReturn(ServerStatus.MessageDetailRecordingStrategy.NEVER); + Message decoded = new TestMessageFactory().createMessage("message", "source", Tools.nowUTC()); + when(codec.decodeSafe(any(RawMessage.class))).thenReturn(Optional.of(decoded)); + + MessageEvent event = createEvent("valid-payload"); + assertThat(event.getMessage()).isNull(); + + processor.onEvent(event, 0, true); + + verify(failureSubmissionService, never()).submitInputFailure(any(), any()); + assertThat(event.getMessage()).isNotNull(); + } + + @Test + void inputProcessingExceptionIsSubmittedAsFailure() throws Exception { + setUpCodecFactory(); + + final String errorMessage = "GELF message is missing mandatory 'short_message' field."; + final String payload = "invalid-gelf"; + + RawMessage rawMessage = createRawMessage(payload); + InputProcessingException exception = InputProcessingException.create( + errorMessage, + new IllegalArgumentException("Missing field: short_message"), + rawMessage, + payload + ); + + when(codec.decodeSafe(any(RawMessage.class))).thenThrow(exception); + + MessageEvent event = new MessageEvent(); + event.setRaw(rawMessage); + + processor.onEvent(event, 0, true); + + verify(failureSubmissionService).submitInputFailure(exceptionCaptor.capture(), eq(INPUT_ID)); + + InputProcessingException captured = exceptionCaptor.getValue(); + assertThat(captured.getMessage()).isEqualTo(errorMessage); + assertThat(captured.getRawMessage()).isSameAs(rawMessage); + assertThat(captured.getRawMessage().getPayload()).isEqualTo(payload.getBytes(StandardCharsets.UTF_8)); + + // Message is acknowledged twice (in the catch and finally block) + verify(acknowledger, times(2)).acknowledge(rawMessage.getMessageQueueId()); + } + + @Test + void runtimeExceptionIsSubmittedAsFailure() throws Exception { + setUpCodecFactory(); + RawMessage rawMessage = createRawMessage("payload"); + + final String exceptionMessage = "Unable to decode raw message due to an unexpected error."; + final String exceptionCause = "unexpected codec error."; + + when(codec.decodeSafe(any(RawMessage.class))) + .thenThrow(new RuntimeException(exceptionCause)); + + MessageEvent event = new MessageEvent(); + event.setRaw(rawMessage); + + processor.onEvent(event, 0, true); + + verify(failureSubmissionService).submitInputFailure(exceptionCaptor.capture(), eq(INPUT_ID)); + + InputProcessingException captured = exceptionCaptor.getValue(); + assertThat(captured.getMessage()).isEqualTo(exceptionMessage); + assertThat(captured.getCause()).isInstanceOf(RuntimeException.class); + assertThat(captured.getCause().getMessage()).isEqualTo(exceptionCause); + + // Message is acknowledged twice (in the catch and finally block) + verify(acknowledger, times(2)).acknowledge(rawMessage.getMessageQueueId()); + } + + @Test + void missingCodecFactorySkipsMessageWithoutFailure() throws Exception { + RawMessage rawMessage = createRawMessage("payload"); + rawMessage.setCodecName("unknown-codec"); + + MessageEvent event = new MessageEvent(); + event.setRaw(rawMessage); + + processor.onEvent(event, 0, true); + + // Message is skipped when no codec factory is found + verify(failureSubmissionService, never()).submitInputFailure(any(), any()); + + verify(acknowledger).acknowledge(rawMessage.getMessageQueueId()); + } + + private MessageEvent createEvent(String payload) { + MessageEvent event = new MessageEvent(); + event.setRaw(createRawMessage(payload)); + return event; + } + + private RawMessage createRawMessage(String payload) { + RawMessage raw = new RawMessage(payload.getBytes(StandardCharsets.UTF_8)); + raw.setCodecName(CODEC_NAME); + raw.setCodecConfig(Configuration.EMPTY_CONFIGURATION); + raw.addSourceNode(INPUT_ID, new SimpleNodeId(NODE_ID)); + return raw; + } +}