feat(collectors): add extensible SourceConfig type hierarchy with validation

This commit is contained in:
Kay Roepke
2026-02-17 13:59:01 +01:00
parent a816a7f3a0
commit 60cd0d6d71
11 changed files with 539 additions and 0 deletions

View File

@@ -0,0 +1,49 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.collectors.db;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import jakarta.annotation.Nullable;
import java.util.List;
@JsonTypeName(FileSourceConfig.TYPE_NAME)
@JsonIgnoreProperties(value = SourceConfig.TYPE_FIELD, allowGetters = true)
public record FileSourceConfig(
@JsonProperty("paths") List<String> paths,
@JsonProperty("read_mode") String readMode,
@Nullable @JsonProperty("multiline") MultilineConfig multiline
) implements SourceConfig {
public static final String TYPE_NAME = "file";
@Override
public String type() {
return TYPE_NAME;
}
@Override
public void validate() {
if (paths == null || paths.isEmpty()) {
throw new IllegalArgumentException("FileSourceConfig requires at least one path");
}
if (readMode == null || readMode.isBlank()) {
throw new IllegalArgumentException("FileSourceConfig requires a non-blank read_mode");
}
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.collectors.db;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import jakarta.annotation.Nullable;
import java.util.List;
@JsonTypeName(JournaldSourceConfig.TYPE_NAME)
@JsonIgnoreProperties(value = SourceConfig.TYPE_FIELD, allowGetters = true)
public record JournaldSourceConfig(
@JsonProperty("units") List<String> units,
@Nullable @JsonProperty("match_pattern") String matchPattern
) implements SourceConfig {
public static final String TYPE_NAME = "journald";
@Override
public String type() {
return TYPE_NAME;
}
@Override
public void validate() {
if (units == null || units.isEmpty()) {
throw new IllegalArgumentException("JournaldSourceConfig requires at least one unit");
}
}
}

View File

@@ -0,0 +1,25 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.collectors.db;
import com.fasterxml.jackson.annotation.JsonProperty;
public record MultilineConfig(
@JsonProperty("pattern") String pattern,
@JsonProperty("negate") boolean negate
) {
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.collectors.db;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = "type",
visible = true,
defaultImpl = UnknownSourceConfig.class)
public interface SourceConfig {
String TYPE_FIELD = "type";
@JsonProperty(TYPE_FIELD)
String type();
/** Validate this config. Throws IllegalArgumentException on invalid config. */
void validate();
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.collectors.db;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName(TcpSourceConfig.TYPE_NAME)
@JsonIgnoreProperties(value = SourceConfig.TYPE_FIELD, allowGetters = true)
public record TcpSourceConfig(
@JsonProperty("bind_address") String bindAddress,
@JsonProperty("port") int port
) implements SourceConfig {
public static final String TYPE_NAME = "tcp";
@Override
public String type() {
return TYPE_NAME;
}
@Override
public void validate() {
if (bindAddress == null || bindAddress.isBlank()) {
throw new IllegalArgumentException("TcpSourceConfig requires a non-blank bind_address");
}
if (port < 1 || port > 65535) {
throw new IllegalArgumentException("TcpSourceConfig port must be between 1 and 65535");
}
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.collectors.db;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName(UdpSourceConfig.TYPE_NAME)
@JsonIgnoreProperties(value = SourceConfig.TYPE_FIELD, allowGetters = true)
public record UdpSourceConfig(
@JsonProperty("bind_address") String bindAddress,
@JsonProperty("port") int port
) implements SourceConfig {
public static final String TYPE_NAME = "udp";
@Override
public String type() {
return TYPE_NAME;
}
@Override
public void validate() {
if (bindAddress == null || bindAddress.isBlank()) {
throw new IllegalArgumentException("UdpSourceConfig requires a non-blank bind_address");
}
if (port < 1 || port > 65535) {
throw new IllegalArgumentException("UdpSourceConfig port must be between 1 and 65535");
}
}
}

View File

@@ -0,0 +1,51 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.collectors.db;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.LinkedHashMap;
import java.util.Map;
public class UnknownSourceConfig implements SourceConfig {
@JsonProperty(TYPE_FIELD)
private String type;
private final Map<String, Object> additionalProperties = new LinkedHashMap<>();
@Override
public String type() {
return type;
}
@JsonAnyGetter
public Map<String, Object> additionalProperties() {
return additionalProperties;
}
@JsonAnySetter
public void setAdditionalProperty(String key, Object value) {
additionalProperties.put(key, value);
}
@Override
public void validate() {
// Unknown configs are always valid — we preserve them as-is.
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.collectors.db;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import jakarta.annotation.Nullable;
import java.util.List;
@JsonTypeName(WindowsEventLogSourceConfig.TYPE_NAME)
@JsonIgnoreProperties(value = SourceConfig.TYPE_FIELD, allowGetters = true)
public record WindowsEventLogSourceConfig(
@JsonProperty("channels") List<String> channels,
@Nullable @JsonProperty("query") String query
) implements SourceConfig {
public static final String TYPE_NAME = "windows_event_log";
@Override
public String type() {
return TYPE_NAME;
}
@Override
public void validate() {
if (channels == null || channels.isEmpty()) {
throw new IllegalArgumentException("WindowsEventLogSourceConfig requires at least one channel");
}
}
}

View File

@@ -0,0 +1,133 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.collectors.db;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
class SourceConfigTest {
private ObjectMapper objectMapper;
@BeforeEach
void setUp() {
objectMapper = new ObjectMapperProvider().get();
objectMapper.registerSubtypes(
new NamedType(FileSourceConfig.class, FileSourceConfig.TYPE_NAME),
new NamedType(JournaldSourceConfig.class, JournaldSourceConfig.TYPE_NAME),
new NamedType(WindowsEventLogSourceConfig.class, WindowsEventLogSourceConfig.TYPE_NAME),
new NamedType(TcpSourceConfig.class, TcpSourceConfig.TYPE_NAME),
new NamedType(UdpSourceConfig.class, UdpSourceConfig.TYPE_NAME)
);
}
@Test
void deserializeFileSource() throws Exception {
final var json = """
{
"type": "file",
"paths": ["/var/log/syslog", "/var/log/auth.log"],
"read_mode": "tail",
"multiline": {
"pattern": "^\\\\d{4}-",
"negate": true
}
}
""";
final var config = objectMapper.readValue(json, SourceConfig.class);
assertThat(config).isInstanceOf(FileSourceConfig.class);
final var fileConfig = (FileSourceConfig) config;
assertThat(fileConfig.type()).isEqualTo("file");
assertThat(fileConfig.paths()).containsExactly("/var/log/syslog", "/var/log/auth.log");
assertThat(fileConfig.readMode()).isEqualTo("tail");
assertThat(fileConfig.multiline()).isNotNull();
assertThat(fileConfig.multiline().pattern()).isEqualTo("^\\d{4}-");
assertThat(fileConfig.multiline().negate()).isTrue();
}
@Test
void deserializeTcpSource() throws Exception {
final var json = """
{
"type": "tcp",
"bind_address": "0.0.0.0",
"port": 5140
}
""";
final var config = objectMapper.readValue(json, SourceConfig.class);
assertThat(config).isInstanceOf(TcpSourceConfig.class);
final var tcpConfig = (TcpSourceConfig) config;
assertThat(tcpConfig.type()).isEqualTo("tcp");
assertThat(tcpConfig.bindAddress()).isEqualTo("0.0.0.0");
assertThat(tcpConfig.port()).isEqualTo(5140);
}
@Test
void unknownTypeFallsBackToUnknownSourceConfig() throws Exception {
final var json = """
{
"type": "custom_enterprise_type",
"some_field": "some_value",
"nested": {"key": 42}
}
""";
final var config = objectMapper.readValue(json, SourceConfig.class);
assertThat(config).isInstanceOf(UnknownSourceConfig.class);
assertThat(config.type()).isEqualTo("custom_enterprise_type");
final var unknown = (UnknownSourceConfig) config;
assertThat(unknown.additionalProperties()).containsEntry("some_field", "some_value");
}
@Test
void fileSourceValidation() {
final var config = new FileSourceConfig(List.of(), "tail", null);
assertThatThrownBy(config::validate).isInstanceOf(IllegalArgumentException.class);
}
@Test
void tcpSourcePortValidation() {
final var config = new TcpSourceConfig("0.0.0.0", 0);
assertThatThrownBy(config::validate).isInstanceOf(IllegalArgumentException.class);
}
@Test
void jsonRoundTrip() throws Exception {
final var original = new FileSourceConfig(
List.of("/var/log/syslog"),
"tail",
new MultilineConfig("^\\d{4}-", true)
);
final var json = objectMapper.writeValueAsString(original);
final var deserialized = objectMapper.readValue(json, SourceConfig.class);
assertThat(deserialized).isEqualTo(original);
}
}

63
log4j2.xml Normal file
View File

@@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration packages="org.graylog2.log4j" shutdownHook="disable">
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="%d %-5p: %c - %m%n"/>
</Console>
<!-- Internal Graylog log appender. Please do not disable. This makes internal log messages available via REST calls. -->
<Memory name="graylog-internal-logs" bufferSizeBytes="10MB">
<PatternLayout pattern="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX} %-5p [%c{1}] %m%n"/>
</Memory>
</Appenders>
<Loggers>
<!-- Application Loggers -->
<Logger name="org.graylog2" level="info"/>
<Logger name="com.github.joschi.jadconfig" level="warn"/>
<!-- Prevent DEBUG message about Lucene Expressions not found. -->
<Logger name="org.elasticsearch.script" level="warn"/>
<!-- Disable messages from the version check -->
<Logger name="org.graylog2.periodical.VersionCheckThread" level="off"/>
<!-- Silence chatty natty -->
<Logger name="com.joestelmach.natty.Parser" level="warn"/>
<!-- Silence Kafka log chatter -->
<Logger name="org.graylog.shaded.kafka09.log.Log" level="warn"/>
<Logger name="org.graylog.shaded.kafka09.log.OffsetIndex" level="warn"/>
<Logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="warn"/>
<Logger name="org.apache.kafka.clients.producer.ProducerConfig" level="warn"/>
<!-- Silence useless session validation messages -->
<Logger name="org.apache.shiro.session.mgt.AbstractValidatingSessionManager" level="warn"/>
<!-- Silence informational Azure SDK messages -->
<Logger name="com.azure" level="warn"/>
<Logger name="reactor.core.publisher.Operators" level="off"/>
<Logger name="com.azure.messaging.eventhubs.PartitionPumpManager" level="off"/>
<Logger name="com.azure.core.amqp.implementation.ReactorReceiver" level="off"/>
<Logger name="com.azure.core.amqp.implementation.ReactorDispatcher" level="off"/>
<Logger name="com.azure.core.amqp.implementation.MessageFlux" level="off"/>
<!-- Silence Apache Hadoop/Avro log chatter -->
<Logger name="org.apache.hadoop" level="warn"/>
<Logger name="org.apache.parquet.hadoop.InternalParquetRecordReader" level="warn"/>
<Logger name="org.apache.avro.Schema" level="error"/>
<!-- Silence Selenium log chatter -->
<Logger name="org.openqa.selenium.devtools.Connection" level="warn"/>
<!-- Silence Apache Iceberg log chatter -->
<Logger name="org.apache.iceberg.BaseMetastoreCatalog" level="warn"/>
<Logger name="org.apache.iceberg.BaseMetastoreTableOperations" level="warn"/>
<Logger name="org.apache.iceberg.CatalogUtil" level="warn"/>
<Logger name="org.apache.iceberg.IncrementalFileCleanup" level="warn"/>
<Logger name="org.apache.iceberg.SnapshotProducer" level="warn"/>
<Logger name="org.apache.iceberg.SnapshotScan" level="warn"/>
<Logger name="org.apache.iceberg.aws.s3.S3InputStream" level="warn">
<RegexFilter regex=".*An error occurred while aborting the stream.*" onMatch="DENY" onMismatch="ACCEPT"/>
</Logger>
<Logger name="org.apache.iceberg.metrics.LoggingMetricsReporter" level="warn"/>
<Logger name="org.graylog2.opamp" level="debug"/>
<!-- Silence AWS Kinesis log chatter -->
<Logger name="software.amazon.kinesis" level="warn"/>
<Root level="info">
<AppenderRef ref="STDOUT"/>
<AppenderRef ref="graylog-internal-logs"/>
</Root>
</Loggers>
</Configuration>

View File

@@ -817,3 +817,5 @@ mongodb_max_connections = 1000
# instability. Proceed with caution.
# Default: 0
#search_query_engine_data_lake_jobs_queue_size = 0
license_manager_url = https://license-dev.torch.sh/