Merge branch 'master' into refactor/improve-configuration-state-updater

This commit is contained in:
Dennis Oelkers
2026-03-12 16:18:13 +01:00
committed by GitHub
8 changed files with 421 additions and 10 deletions

View File

@@ -0,0 +1,4 @@
type = "a"
message = "Add support for handling input failures received from the forwarder."
pulls = ["25025"]

View File

@@ -218,6 +218,15 @@ public class FailureSubmissionService {
}
}
public void submitInputFailure(InputFailure inputFailure) {
try {
failureSubmissionQueue.submitBlocking(FailureBatch.inputFailureBatch(List.of(inputFailure)));
} catch (InterruptedException ignored) {
logger.warn("Failed to submit an input failure for failure handling. The thread has been interrupted!");
Thread.currentThread().interrupt();
}
}
private IndexingFailure fromIndexingError(IndexingError indexingError) {
return new IndexingFailure(
indexingError.error().type() == MappingError ?

View File

@@ -25,6 +25,7 @@ import org.joda.time.DateTime;
import java.util.Optional;
import static org.graylog2.plugin.Message.FIELD_GL2_FORWARDER_INPUT;
import static org.graylog2.plugin.Message.FIELD_GL2_SOURCE_INPUT;
import static org.graylog2.plugin.Message.FIELD_GL2_SOURCE_NODE;
import static org.graylog2.plugin.Message.FIELD_SOURCE;
@@ -37,6 +38,8 @@ public class InputFailure implements Failure {
private final DateTime failureTimestamp;
private final RawMessage rawMessage;
private final String originalMessage;
@Nullable
private String forwarderInputId;
public InputFailure(@Nonnull FailureCause failureCause,
@Nonnull String failureMessage,
@@ -108,6 +111,7 @@ public class InputFailure implements Failure {
.put(FIELD_GL2_SOURCE_INPUT, sourceNode.inputId)
.put(FIELD_GL2_SOURCE_NODE, sourceNode.nodeId)
);
builder.put(FIELD_GL2_FORWARDER_INPUT, forwarderInputId);
Optional.ofNullable(rawMessage.getRemoteAddress()).ifPresent(address ->
builder.put(FIELD_SOURCE, address.toString()));
@@ -123,4 +127,17 @@ public class InputFailure implements Failure {
public Object getMessageQueueId() {
return null;
}
public RawMessage getRawMessage() {
return rawMessage;
}
@Nullable
public String getForwarderInputId() {
return forwarderInputId;
}
public void setForwarderInputId(@Nullable String forwarderInputId) {
this.forwarderInputId = forwarderInputId;
}
}

View File

@@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.graylog2.shared.utilities.StringUtils.f;
@@ -61,11 +62,10 @@ public class RefreshingLockService implements AutoCloseable {
* @throws AlreadyLockedException when the resource couldn't be locked
*/
public void acquireAndKeepLock(String resource, int maxConcurrency) throws AlreadyLockedException {
Optional<Lock> optionalLock = lockService.lock(resource, maxConcurrency);
if (optionalLock.isEmpty()) {
throw new AlreadyLockedException(f("Could not acquire lock for resource <%s> with max concurrency <%d>", resource, maxConcurrency));
}
scheduleLock(optionalLock.get());
assertNoLockYet();
final var newLock = lockService.lock(resource, maxConcurrency)
.orElseThrow(() -> new AlreadyLockedException(f("Could not acquire lock for resource <%s> with max concurrency <%d>", resource, maxConcurrency)));
scheduleLock(newLock);
}
/**
@@ -76,12 +76,15 @@ public class RefreshingLockService implements AutoCloseable {
* @throws AlreadyLockedException when the resource couldn't be locked
*/
public void acquireAndKeepLock(String resource, String lockContext) throws AlreadyLockedException {
assertNoLockYet();
checkArgument(!isNullOrEmpty(lockContext), "lockContext cannot be blank");
Optional<Lock> optionalLock = lockService.lock(resource, lockContext);
if (optionalLock.isEmpty()) {
throw new AlreadyLockedException(f("Could not acquire lock for resource <%s> and lock context <%s>", resource, lockContext));
}
scheduleLock(optionalLock.get());
final var newLock = lockService.lock(resource, lockContext)
.orElseThrow(() -> new AlreadyLockedException(f("Could not acquire lock for resource <%s> and lock context <%s>", resource, lockContext)));
scheduleLock(newLock);
}
private void assertNoLockYet() {
checkState(lock == null, "Unable to acquire new lock, already holding lock that would get lost: " + lock);
}
private void scheduleLock(Lock newLock) {

View File

@@ -19,6 +19,7 @@ package org.graylog.failure;
import org.graylog2.indexer.messages.IndexingError;
import org.graylog2.inputs.diagnosis.InputDiagnosisMetrics;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.inputs.failure.InputProcessingException;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
@@ -34,6 +35,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.graylog.failure.InputFailureCause.INPUT_PARSE;
import static org.graylog2.indexer.messages.IndexingError.Type.MappingError;
import static org.graylog2.indexer.messages.IndexingError.Type.Unknown;
import static org.mockito.Mockito.lenient;
@@ -416,4 +418,38 @@ class FailureSubmissionServiceTest {
assertThat(fb.getFailures().get(0)).satisfies(indexingFailure ->
assertThat(indexingFailure.failureDetails()).isEqualTo("IllegalArgumentException: rootCauseMessage")));
}
@Test
void submitInputFailure() throws Exception {
final String failureMessage = "failure-message";
final String failureDetails = "failure-details";
InputFailure inputFailure = new InputFailure(
INPUT_PARSE,
failureMessage,
failureDetails,
Tools.nowUTC(),
new RawMessage(new byte[]{1, 2, 3}),
"original-payload"
);
underTest.submitInputFailure(inputFailure);
verify(failureSubmissionQueue, times(1)).submitBlocking(failureBatchCaptor.capture());
assertThat(failureBatchCaptor.getValue()).satisfies(batch -> {
assertThat(batch.containsInputFailures()).isTrue();
assertThat(batch.size()).isEqualTo(1);
assertThat(batch.getFailures().getFirst())
.isSameAs(inputFailure)
.satisfies(failure -> {
assertThat(failure.failureType()).isEqualTo(FailureType.INPUT);
assertThat(failure.failureCause().label()).isEqualTo(INPUT_PARSE.label());
assertThat(failure.message()).isEqualTo(failureMessage);
assertThat(failure.failureDetails()).isEqualTo(failureDetails);
assertThat(failure.failureTimestamp()).isNotNull();
assertThat(failure.requiresAcknowledgement()).isFalse();
});
});
}
}

View File

@@ -0,0 +1,110 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.cluster.lock;
import org.graylog2.shared.SuppressForbidden;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.time.Duration;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class RefreshingLockServiceTest {
@Mock
private LockService lockService;
private ScheduledExecutorService scheduler;
private RefreshingLockService refreshingLockService;
private final Lock firstLock = Lock.builder()
.resource("first-resource")
.lockedBy("node-123")
.createdAt(ZonedDateTime.now(ZoneOffset.UTC))
.updatedAt(ZonedDateTime.now(ZoneOffset.UTC))
.build();
private final Lock secondLock = Lock.builder()
.resource("second-resource")
.lockedBy("node-123")
.createdAt(ZonedDateTime.now(ZoneOffset.UTC))
.updatedAt(ZonedDateTime.now(ZoneOffset.UTC))
.build();
@BeforeEach
@SuppressForbidden("Using Executors.newSingleThreadScheduledExecutor() is okay in tests")
void setUp() {
scheduler = Executors.newSingleThreadScheduledExecutor();
refreshingLockService = new RefreshingLockService(
lockService,
scheduler,
Duration.ofMinutes(5)
);
}
@AfterEach
void tearDown() {
if (scheduler != null) {
scheduler.shutdownNow();
}
}
@Test
void throwsIllegalStateExceptionWhenAcquiringLockWhileAlreadyHoldingOne() throws AlreadyLockedException {
// Mock the lock service to return locks
when(lockService.lock(eq("first-resource"), anyString()))
.thenReturn(Optional.of(firstLock));
// Acquire first lock successfully
refreshingLockService.acquireAndKeepLock("first-resource", "context-1");
// Attempt to acquire second lock while still holding the first
assertThatThrownBy(() -> refreshingLockService.acquireAndKeepLock("second-resource", "context-2"))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Unable to acquire new lock, already holding lock that would get lost");
}
@Test
void throwsIllegalStateExceptionWhenAcquiringLockWithMaxConcurrencyWhileAlreadyHoldingOne() throws AlreadyLockedException {
// Mock the lock service to return locks
when(lockService.lock(eq("first-resource"), eq(1)))
.thenReturn(Optional.of(firstLock));
// Acquire first lock successfully
refreshingLockService.acquireAndKeepLock("first-resource", 1);
// Attempt to acquire second lock while still holding the first
assertThatThrownBy(() -> refreshingLockService.acquireAndKeepLock("second-resource", 1))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Unable to acquire new lock, already holding lock that would get lost");
}
}

View File

@@ -0,0 +1,210 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.shared.buffers.processors;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.graylog.failure.FailureSubmissionService;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.TestMessageFactory;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.inputs.failure.InputProcessingException;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.plugin.system.SimpleNodeId;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class DecodingProcessorTest {
private static final String CODEC_NAME = "test-codec";
private static final String INPUT_ID = "input-id";
private static final String NODE_ID = "node-id";
@Mock
private Codec codec;
@Mock
private Codec.Factory<Codec> codecFactory;
@Mock
private ServerStatus serverStatus;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private MetricRegistry metricRegistry;
@Mock
private MessageQueueAcknowledger acknowledger;
@Mock
private FailureSubmissionService failureSubmissionService;
@Captor
private ArgumentCaptor<InputProcessingException> exceptionCaptor;
private DecodingProcessor processor;
@BeforeEach
void setUp() {
Timer decodeTimer = new Timer();
Timer parseTimer = new Timer();
processor = new DecodingProcessor(
Map.of(CODEC_NAME, codecFactory),
serverStatus,
metricRegistry,
acknowledger,
failureSubmissionService,
decodeTimer,
parseTimer
);
}
private void setUpCodecFactory() {
when(codecFactory.create(any(Configuration.class))).thenReturn(codec);
}
@Test
void validMessageIsDecodedAndSetOnEvent() throws Exception {
setUpCodecFactory();
when(serverStatus.getDetailedMessageRecordingStrategy())
.thenReturn(ServerStatus.MessageDetailRecordingStrategy.NEVER);
Message decoded = new TestMessageFactory().createMessage("message", "source", Tools.nowUTC());
when(codec.decodeSafe(any(RawMessage.class))).thenReturn(Optional.of(decoded));
MessageEvent event = createEvent("valid-payload");
assertThat(event.getMessage()).isNull();
processor.onEvent(event, 0, true);
verify(failureSubmissionService, never()).submitInputFailure(any(), any());
assertThat(event.getMessage()).isNotNull();
}
@Test
void inputProcessingExceptionIsSubmittedAsFailure() throws Exception {
setUpCodecFactory();
final String errorMessage = "GELF message is missing mandatory 'short_message' field.";
final String payload = "invalid-gelf";
RawMessage rawMessage = createRawMessage(payload);
InputProcessingException exception = InputProcessingException.create(
errorMessage,
new IllegalArgumentException("Missing field: short_message"),
rawMessage,
payload
);
when(codec.decodeSafe(any(RawMessage.class))).thenThrow(exception);
MessageEvent event = new MessageEvent();
event.setRaw(rawMessage);
processor.onEvent(event, 0, true);
verify(failureSubmissionService).submitInputFailure(exceptionCaptor.capture(), eq(INPUT_ID));
InputProcessingException captured = exceptionCaptor.getValue();
assertThat(captured.getMessage()).isEqualTo(errorMessage);
assertThat(captured.getRawMessage()).isSameAs(rawMessage);
assertThat(captured.getRawMessage().getPayload()).isEqualTo(payload.getBytes(StandardCharsets.UTF_8));
// Message is acknowledged twice (in the catch and finally block)
verify(acknowledger, times(2)).acknowledge(rawMessage.getMessageQueueId());
}
@Test
void runtimeExceptionIsSubmittedAsFailure() throws Exception {
setUpCodecFactory();
RawMessage rawMessage = createRawMessage("payload");
final String exceptionMessage = "Unable to decode raw message due to an unexpected error.";
final String exceptionCause = "unexpected codec error.";
when(codec.decodeSafe(any(RawMessage.class)))
.thenThrow(new RuntimeException(exceptionCause));
MessageEvent event = new MessageEvent();
event.setRaw(rawMessage);
processor.onEvent(event, 0, true);
verify(failureSubmissionService).submitInputFailure(exceptionCaptor.capture(), eq(INPUT_ID));
InputProcessingException captured = exceptionCaptor.getValue();
assertThat(captured.getMessage()).isEqualTo(exceptionMessage);
assertThat(captured.getCause()).isInstanceOf(RuntimeException.class);
assertThat(captured.getCause().getMessage()).isEqualTo(exceptionCause);
// Message is acknowledged twice (in the catch and finally block)
verify(acknowledger, times(2)).acknowledge(rawMessage.getMessageQueueId());
}
@Test
void missingCodecFactorySkipsMessageWithoutFailure() throws Exception {
RawMessage rawMessage = createRawMessage("payload");
rawMessage.setCodecName("unknown-codec");
MessageEvent event = new MessageEvent();
event.setRaw(rawMessage);
processor.onEvent(event, 0, true);
// Message is skipped when no codec factory is found
verify(failureSubmissionService, never()).submitInputFailure(any(), any());
verify(acknowledger).acknowledge(rawMessage.getMessageQueueId());
}
private MessageEvent createEvent(String payload) {
MessageEvent event = new MessageEvent();
event.setRaw(createRawMessage(payload));
return event;
}
private RawMessage createRawMessage(String payload) {
RawMessage raw = new RawMessage(payload.getBytes(StandardCharsets.UTF_8));
raw.setCodecName(CODEC_NAME);
raw.setCodecConfig(Configuration.EMPTY_CONFIGURATION);
raw.addSourceNode(INPUT_ID, new SimpleNodeId(NODE_ID));
return raw;
}
}

View File

@@ -21,6 +21,7 @@ type NumberInput = number | string;
const NumberUtils = {
JAVA_INTEGER_MIN_VALUE: 2 ** 31 * -1,
JAVA_INTEGER_MAX_VALUE: 2 ** 31 - 1,
BYTES_PER_GB: 1_000_000_000 as const,
normalizeNumber(number: NumberInput): number {
switch (number) {
case 'NaN':
@@ -72,6 +73,27 @@ const NumberUtils = {
return formattedNumber;
},
formatDecimalBytes(number: NumberInput): string {
numeral.zeroFormat('0B');
let formattedNumber: string;
try {
formattedNumber = numeral(this.normalizeNumber(number)).format('0.0b');
} catch (_e) {
formattedNumber = String(number);
}
numeral.zeroFormat(null);
return formattedNumber;
},
bytesToGb(bytes: number): number {
return parseFloat((bytes / this.BYTES_PER_GB).toFixed(2));
},
gbToBytes(gb: number): number {
return Math.round(gb * this.BYTES_PER_GB);
},
isNumber(possibleNumber: unknown): boolean {
return possibleNumber !== '' && !Number.isNaN(Number(possibleNumber));
},