diff --git a/changelog/unreleased/pr-25025.toml b/changelog/unreleased/pr-25025.toml new file mode 100644 index 0000000000..a2ee600491 --- /dev/null +++ b/changelog/unreleased/pr-25025.toml @@ -0,0 +1,4 @@ +type = "a" +message = "Add support for handling input failures received from the forwarder." + +pulls = ["25025"] diff --git a/graylog2-server/src/main/java/org/graylog/failure/FailureSubmissionService.java b/graylog2-server/src/main/java/org/graylog/failure/FailureSubmissionService.java index 4dab7d54b1..3231d11d25 100644 --- a/graylog2-server/src/main/java/org/graylog/failure/FailureSubmissionService.java +++ b/graylog2-server/src/main/java/org/graylog/failure/FailureSubmissionService.java @@ -218,6 +218,15 @@ public class FailureSubmissionService { } } + public void submitInputFailure(InputFailure inputFailure) { + try { + failureSubmissionQueue.submitBlocking(FailureBatch.inputFailureBatch(List.of(inputFailure))); + } catch (InterruptedException ignored) { + logger.warn("Failed to submit an input failure for failure handling. The thread has been interrupted!"); + Thread.currentThread().interrupt(); + } + } + private IndexingFailure fromIndexingError(IndexingError indexingError) { return new IndexingFailure( indexingError.error().type() == MappingError ? diff --git a/graylog2-server/src/main/java/org/graylog/failure/InputFailure.java b/graylog2-server/src/main/java/org/graylog/failure/InputFailure.java index dd4541cc9c..8dd1df70c8 100644 --- a/graylog2-server/src/main/java/org/graylog/failure/InputFailure.java +++ b/graylog2-server/src/main/java/org/graylog/failure/InputFailure.java @@ -25,6 +25,7 @@ import org.joda.time.DateTime; import java.util.Optional; +import static org.graylog2.plugin.Message.FIELD_GL2_FORWARDER_INPUT; import static org.graylog2.plugin.Message.FIELD_GL2_SOURCE_INPUT; import static org.graylog2.plugin.Message.FIELD_GL2_SOURCE_NODE; import static org.graylog2.plugin.Message.FIELD_SOURCE; @@ -37,6 +38,8 @@ public class InputFailure implements Failure { private final DateTime failureTimestamp; private final RawMessage rawMessage; private final String originalMessage; + @Nullable + private String forwarderInputId; public InputFailure(@Nonnull FailureCause failureCause, @Nonnull String failureMessage, @@ -108,6 +111,7 @@ public class InputFailure implements Failure { .put(FIELD_GL2_SOURCE_INPUT, sourceNode.inputId) .put(FIELD_GL2_SOURCE_NODE, sourceNode.nodeId) ); + builder.put(FIELD_GL2_FORWARDER_INPUT, forwarderInputId); Optional.ofNullable(rawMessage.getRemoteAddress()).ifPresent(address -> builder.put(FIELD_SOURCE, address.toString())); @@ -123,4 +127,17 @@ public class InputFailure implements Failure { public Object getMessageQueueId() { return null; } + + public RawMessage getRawMessage() { + return rawMessage; + } + + @Nullable + public String getForwarderInputId() { + return forwarderInputId; + } + + public void setForwarderInputId(@Nullable String forwarderInputId) { + this.forwarderInputId = forwarderInputId; + } } diff --git a/graylog2-server/src/main/java/org/graylog2/cluster/lock/RefreshingLockService.java b/graylog2-server/src/main/java/org/graylog2/cluster/lock/RefreshingLockService.java index 9f5c73f63b..361daa0f9d 100644 --- a/graylog2-server/src/main/java/org/graylog2/cluster/lock/RefreshingLockService.java +++ b/graylog2-server/src/main/java/org/graylog2/cluster/lock/RefreshingLockService.java @@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; import static org.graylog2.shared.utilities.StringUtils.f; @@ -61,11 +62,10 @@ public class RefreshingLockService implements AutoCloseable { * @throws AlreadyLockedException when the resource couldn't be locked */ public void acquireAndKeepLock(String resource, int maxConcurrency) throws AlreadyLockedException { - Optional optionalLock = lockService.lock(resource, maxConcurrency); - if (optionalLock.isEmpty()) { - throw new AlreadyLockedException(f("Could not acquire lock for resource <%s> with max concurrency <%d>", resource, maxConcurrency)); - } - scheduleLock(optionalLock.get()); + assertNoLockYet(); + final var newLock = lockService.lock(resource, maxConcurrency) + .orElseThrow(() -> new AlreadyLockedException(f("Could not acquire lock for resource <%s> with max concurrency <%d>", resource, maxConcurrency))); + scheduleLock(newLock); } /** @@ -76,12 +76,15 @@ public class RefreshingLockService implements AutoCloseable { * @throws AlreadyLockedException when the resource couldn't be locked */ public void acquireAndKeepLock(String resource, String lockContext) throws AlreadyLockedException { + assertNoLockYet(); checkArgument(!isNullOrEmpty(lockContext), "lockContext cannot be blank"); - Optional optionalLock = lockService.lock(resource, lockContext); - if (optionalLock.isEmpty()) { - throw new AlreadyLockedException(f("Could not acquire lock for resource <%s> and lock context <%s>", resource, lockContext)); - } - scheduleLock(optionalLock.get()); + final var newLock = lockService.lock(resource, lockContext) + .orElseThrow(() -> new AlreadyLockedException(f("Could not acquire lock for resource <%s> and lock context <%s>", resource, lockContext))); + scheduleLock(newLock); + } + + private void assertNoLockYet() { + checkState(lock == null, "Unable to acquire new lock, already holding lock that would get lost: " + lock); } private void scheduleLock(Lock newLock) { diff --git a/graylog2-server/src/test/java/org/graylog/failure/FailureSubmissionServiceTest.java b/graylog2-server/src/test/java/org/graylog/failure/FailureSubmissionServiceTest.java index c03951afb5..e3aa8db87b 100644 --- a/graylog2-server/src/test/java/org/graylog/failure/FailureSubmissionServiceTest.java +++ b/graylog2-server/src/test/java/org/graylog/failure/FailureSubmissionServiceTest.java @@ -19,6 +19,7 @@ package org.graylog.failure; import org.graylog2.indexer.messages.IndexingError; import org.graylog2.inputs.diagnosis.InputDiagnosisMetrics; import org.graylog2.plugin.Message; +import org.graylog2.plugin.Tools; import org.graylog2.plugin.inputs.failure.InputProcessingException; import org.graylog2.plugin.journal.RawMessage; import org.graylog2.shared.bindings.providers.ObjectMapperProvider; @@ -34,6 +35,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; +import static org.graylog.failure.InputFailureCause.INPUT_PARSE; import static org.graylog2.indexer.messages.IndexingError.Type.MappingError; import static org.graylog2.indexer.messages.IndexingError.Type.Unknown; import static org.mockito.Mockito.lenient; @@ -416,4 +418,38 @@ class FailureSubmissionServiceTest { assertThat(fb.getFailures().get(0)).satisfies(indexingFailure -> assertThat(indexingFailure.failureDetails()).isEqualTo("IllegalArgumentException: rootCauseMessage"))); } + + @Test + void submitInputFailure() throws Exception { + final String failureMessage = "failure-message"; + final String failureDetails = "failure-details"; + + InputFailure inputFailure = new InputFailure( + INPUT_PARSE, + failureMessage, + failureDetails, + Tools.nowUTC(), + new RawMessage(new byte[]{1, 2, 3}), + "original-payload" + ); + + underTest.submitInputFailure(inputFailure); + + verify(failureSubmissionQueue, times(1)).submitBlocking(failureBatchCaptor.capture()); + assertThat(failureBatchCaptor.getValue()).satisfies(batch -> { + assertThat(batch.containsInputFailures()).isTrue(); + assertThat(batch.size()).isEqualTo(1); + + assertThat(batch.getFailures().getFirst()) + .isSameAs(inputFailure) + .satisfies(failure -> { + assertThat(failure.failureType()).isEqualTo(FailureType.INPUT); + assertThat(failure.failureCause().label()).isEqualTo(INPUT_PARSE.label()); + assertThat(failure.message()).isEqualTo(failureMessage); + assertThat(failure.failureDetails()).isEqualTo(failureDetails); + assertThat(failure.failureTimestamp()).isNotNull(); + assertThat(failure.requiresAcknowledgement()).isFalse(); + }); + }); + } } diff --git a/graylog2-server/src/test/java/org/graylog2/cluster/lock/RefreshingLockServiceTest.java b/graylog2-server/src/test/java/org/graylog2/cluster/lock/RefreshingLockServiceTest.java new file mode 100644 index 0000000000..cb3537fa70 --- /dev/null +++ b/graylog2-server/src/test/java/org/graylog2/cluster/lock/RefreshingLockServiceTest.java @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.cluster.lock; + +import org.graylog2.shared.SuppressForbidden; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Duration; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class RefreshingLockServiceTest { + + @Mock + private LockService lockService; + + private ScheduledExecutorService scheduler; + + private RefreshingLockService refreshingLockService; + + private final Lock firstLock = Lock.builder() + .resource("first-resource") + .lockedBy("node-123") + .createdAt(ZonedDateTime.now(ZoneOffset.UTC)) + .updatedAt(ZonedDateTime.now(ZoneOffset.UTC)) + .build(); + + private final Lock secondLock = Lock.builder() + .resource("second-resource") + .lockedBy("node-123") + .createdAt(ZonedDateTime.now(ZoneOffset.UTC)) + .updatedAt(ZonedDateTime.now(ZoneOffset.UTC)) + .build(); + + @BeforeEach + @SuppressForbidden("Using Executors.newSingleThreadScheduledExecutor() is okay in tests") + void setUp() { + scheduler = Executors.newSingleThreadScheduledExecutor(); + refreshingLockService = new RefreshingLockService( + lockService, + scheduler, + Duration.ofMinutes(5) + ); + } + + @AfterEach + void tearDown() { + if (scheduler != null) { + scheduler.shutdownNow(); + } + } + + @Test + void throwsIllegalStateExceptionWhenAcquiringLockWhileAlreadyHoldingOne() throws AlreadyLockedException { + // Mock the lock service to return locks + when(lockService.lock(eq("first-resource"), anyString())) + .thenReturn(Optional.of(firstLock)); + + // Acquire first lock successfully + refreshingLockService.acquireAndKeepLock("first-resource", "context-1"); + + // Attempt to acquire second lock while still holding the first + assertThatThrownBy(() -> refreshingLockService.acquireAndKeepLock("second-resource", "context-2")) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Unable to acquire new lock, already holding lock that would get lost"); + } + + @Test + void throwsIllegalStateExceptionWhenAcquiringLockWithMaxConcurrencyWhileAlreadyHoldingOne() throws AlreadyLockedException { + // Mock the lock service to return locks + when(lockService.lock(eq("first-resource"), eq(1))) + .thenReturn(Optional.of(firstLock)); + + // Acquire first lock successfully + refreshingLockService.acquireAndKeepLock("first-resource", 1); + + // Attempt to acquire second lock while still holding the first + assertThatThrownBy(() -> refreshingLockService.acquireAndKeepLock("second-resource", 1)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Unable to acquire new lock, already holding lock that would get lost"); + } +} diff --git a/graylog2-server/src/test/java/org/graylog2/shared/buffers/processors/DecodingProcessorTest.java b/graylog2-server/src/test/java/org/graylog2/shared/buffers/processors/DecodingProcessorTest.java new file mode 100644 index 0000000000..f0690b4206 --- /dev/null +++ b/graylog2-server/src/test/java/org/graylog2/shared/buffers/processors/DecodingProcessorTest.java @@ -0,0 +1,210 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.shared.buffers.processors; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import org.graylog.failure.FailureSubmissionService; +import org.graylog2.plugin.Message; +import org.graylog2.plugin.ServerStatus; +import org.graylog2.plugin.TestMessageFactory; +import org.graylog2.plugin.Tools; +import org.graylog2.plugin.buffers.MessageEvent; +import org.graylog2.plugin.configuration.Configuration; +import org.graylog2.plugin.inputs.codecs.Codec; +import org.graylog2.plugin.inputs.failure.InputProcessingException; +import org.graylog2.plugin.journal.RawMessage; +import org.graylog2.plugin.system.SimpleNodeId; +import org.graylog2.shared.messageq.MessageQueueAcknowledger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class DecodingProcessorTest { + + private static final String CODEC_NAME = "test-codec"; + private static final String INPUT_ID = "input-id"; + private static final String NODE_ID = "node-id"; + + @Mock + private Codec codec; + + @Mock + private Codec.Factory codecFactory; + + @Mock + private ServerStatus serverStatus; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MetricRegistry metricRegistry; + + @Mock + private MessageQueueAcknowledger acknowledger; + + @Mock + private FailureSubmissionService failureSubmissionService; + + @Captor + private ArgumentCaptor exceptionCaptor; + + private DecodingProcessor processor; + + @BeforeEach + void setUp() { + Timer decodeTimer = new Timer(); + Timer parseTimer = new Timer(); + + processor = new DecodingProcessor( + Map.of(CODEC_NAME, codecFactory), + serverStatus, + metricRegistry, + acknowledger, + failureSubmissionService, + decodeTimer, + parseTimer + ); + } + + private void setUpCodecFactory() { + when(codecFactory.create(any(Configuration.class))).thenReturn(codec); + } + + @Test + void validMessageIsDecodedAndSetOnEvent() throws Exception { + setUpCodecFactory(); + when(serverStatus.getDetailedMessageRecordingStrategy()) + .thenReturn(ServerStatus.MessageDetailRecordingStrategy.NEVER); + Message decoded = new TestMessageFactory().createMessage("message", "source", Tools.nowUTC()); + when(codec.decodeSafe(any(RawMessage.class))).thenReturn(Optional.of(decoded)); + + MessageEvent event = createEvent("valid-payload"); + assertThat(event.getMessage()).isNull(); + + processor.onEvent(event, 0, true); + + verify(failureSubmissionService, never()).submitInputFailure(any(), any()); + assertThat(event.getMessage()).isNotNull(); + } + + @Test + void inputProcessingExceptionIsSubmittedAsFailure() throws Exception { + setUpCodecFactory(); + + final String errorMessage = "GELF message is missing mandatory 'short_message' field."; + final String payload = "invalid-gelf"; + + RawMessage rawMessage = createRawMessage(payload); + InputProcessingException exception = InputProcessingException.create( + errorMessage, + new IllegalArgumentException("Missing field: short_message"), + rawMessage, + payload + ); + + when(codec.decodeSafe(any(RawMessage.class))).thenThrow(exception); + + MessageEvent event = new MessageEvent(); + event.setRaw(rawMessage); + + processor.onEvent(event, 0, true); + + verify(failureSubmissionService).submitInputFailure(exceptionCaptor.capture(), eq(INPUT_ID)); + + InputProcessingException captured = exceptionCaptor.getValue(); + assertThat(captured.getMessage()).isEqualTo(errorMessage); + assertThat(captured.getRawMessage()).isSameAs(rawMessage); + assertThat(captured.getRawMessage().getPayload()).isEqualTo(payload.getBytes(StandardCharsets.UTF_8)); + + // Message is acknowledged twice (in the catch and finally block) + verify(acknowledger, times(2)).acknowledge(rawMessage.getMessageQueueId()); + } + + @Test + void runtimeExceptionIsSubmittedAsFailure() throws Exception { + setUpCodecFactory(); + RawMessage rawMessage = createRawMessage("payload"); + + final String exceptionMessage = "Unable to decode raw message due to an unexpected error."; + final String exceptionCause = "unexpected codec error."; + + when(codec.decodeSafe(any(RawMessage.class))) + .thenThrow(new RuntimeException(exceptionCause)); + + MessageEvent event = new MessageEvent(); + event.setRaw(rawMessage); + + processor.onEvent(event, 0, true); + + verify(failureSubmissionService).submitInputFailure(exceptionCaptor.capture(), eq(INPUT_ID)); + + InputProcessingException captured = exceptionCaptor.getValue(); + assertThat(captured.getMessage()).isEqualTo(exceptionMessage); + assertThat(captured.getCause()).isInstanceOf(RuntimeException.class); + assertThat(captured.getCause().getMessage()).isEqualTo(exceptionCause); + + // Message is acknowledged twice (in the catch and finally block) + verify(acknowledger, times(2)).acknowledge(rawMessage.getMessageQueueId()); + } + + @Test + void missingCodecFactorySkipsMessageWithoutFailure() throws Exception { + RawMessage rawMessage = createRawMessage("payload"); + rawMessage.setCodecName("unknown-codec"); + + MessageEvent event = new MessageEvent(); + event.setRaw(rawMessage); + + processor.onEvent(event, 0, true); + + // Message is skipped when no codec factory is found + verify(failureSubmissionService, never()).submitInputFailure(any(), any()); + + verify(acknowledger).acknowledge(rawMessage.getMessageQueueId()); + } + + private MessageEvent createEvent(String payload) { + MessageEvent event = new MessageEvent(); + event.setRaw(createRawMessage(payload)); + return event; + } + + private RawMessage createRawMessage(String payload) { + RawMessage raw = new RawMessage(payload.getBytes(StandardCharsets.UTF_8)); + raw.setCodecName(CODEC_NAME); + raw.setCodecConfig(Configuration.EMPTY_CONFIGURATION); + raw.addSourceNode(INPUT_ID, new SimpleNodeId(NODE_ID)); + return raw; + } +} diff --git a/graylog2-web-interface/src/util/NumberUtils.ts b/graylog2-web-interface/src/util/NumberUtils.ts index 54e1494ab6..ba9ce36d99 100644 --- a/graylog2-web-interface/src/util/NumberUtils.ts +++ b/graylog2-web-interface/src/util/NumberUtils.ts @@ -21,6 +21,7 @@ type NumberInput = number | string; const NumberUtils = { JAVA_INTEGER_MIN_VALUE: 2 ** 31 * -1, JAVA_INTEGER_MAX_VALUE: 2 ** 31 - 1, + BYTES_PER_GB: 1_000_000_000 as const, normalizeNumber(number: NumberInput): number { switch (number) { case 'NaN': @@ -72,6 +73,27 @@ const NumberUtils = { return formattedNumber; }, + formatDecimalBytes(number: NumberInput): string { + numeral.zeroFormat('0B'); + + let formattedNumber: string; + + try { + formattedNumber = numeral(this.normalizeNumber(number)).format('0.0b'); + } catch (_e) { + formattedNumber = String(number); + } + + numeral.zeroFormat(null); + + return formattedNumber; + }, + bytesToGb(bytes: number): number { + return parseFloat((bytes / this.BYTES_PER_GB).toFixed(2)); + }, + gbToBytes(gb: number): number { + return Math.round(gb * this.BYTES_PER_GB); + }, isNumber(possibleNumber: unknown): boolean { return possibleNumber !== '' && !Number.isNaN(Number(possibleNumber)); },