Merge branch 'master' into migrate-license-subscribers-to-cluster-event-bus

This commit is contained in:
Dennis Oelkers
2026-03-11 10:54:37 +01:00
committed by GitHub
5 changed files with 0 additions and 1084 deletions

View File

@@ -1,268 +0,0 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.events.processor.mongodb;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoCollection;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.graylog.events.event.Event;
import org.graylog.events.event.EventFactory;
import org.graylog.events.event.EventOriginContext;
import org.graylog.events.event.EventReplayInfo;
import org.graylog.events.event.EventWithContext;
import org.graylog.events.fields.FieldValue;
import org.graylog.events.processor.DBEventProcessorStateService;
import org.graylog.events.processor.EventConsumer;
import org.graylog.events.processor.EventDefinition;
import org.graylog.events.processor.EventProcessor;
import org.graylog.events.processor.EventProcessorException;
import org.graylog.events.processor.EventProcessorParameters;
import org.graylog2.database.MongoCollections;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.MessageFactory;
import org.graylog2.plugin.MessageSummary;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Consumer;
import static com.mongodb.client.model.Aggregates.limit;
import static com.mongodb.client.model.Aggregates.match;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.gte;
import static com.mongodb.client.model.Filters.lt;
/**
* EventProcessor implementation that executes a MongoDB aggregation pipeline to create events based on the aggregation results.
* Events are intended to alert to specific DB conditions - the pipeline is expected to produce a single document result,
* which is then converted into an Event.
*/
public abstract class MongoDBEventProcessor implements EventProcessor {
private static final Logger LOG = LoggerFactory.getLogger(MongoDBEventProcessor.class);
private static final int MONGO_BATCHSIZE = 200;
private final EventDefinition eventDefinition;
private final MongoDBEventProcessorConfig config;
private final String collectionName;
private final MongoCollection<Document> collection;
private final DBEventProcessorStateService stateService;
private final MessageFactory messageFactory;
private final ObjectMapper objectMapper;
// The collectionName is provided explicitly by the subclass rather than read from config,
// so the processor only accesses collections the subclass intends.
protected MongoDBEventProcessor(EventDefinition eventDefinition,
MongoDBEventProcessorConfig config,
String collectionName,
MongoCollections mongoCollections,
DBEventProcessorStateService stateService,
MessageFactory messageFactory,
ObjectMapper objectMapper) {
this.eventDefinition = eventDefinition;
this.config = config;
this.collectionName = collectionName;
this.collection = mongoCollections.nonEntityCollection(collectionName, Document.class);
this.stateService = stateService;
this.messageFactory = messageFactory;
this.objectMapper = objectMapper;
}
@Override
public void createEvents(EventFactory eventFactory,
EventProcessorParameters processorParameters,
EventConsumer<List<EventWithContext>> eventsConsumer)
throws EventProcessorException {
if (!(processorParameters instanceof final MongoDBEventProcessorParameters parameters)) {
final String message = String.format(Locale.ROOT,
"Invalid parameters type for MongoDBEventProcessor. Expected <%s> but got <%s> for event definition <%s/%s>.",
MongoDBEventProcessorParameters.class.getSimpleName(),
processorParameters != null ? processorParameters.getClass().getName() : "null",
eventDefinition.title(),
eventDefinition.id());
throw new EventProcessorException(message, false, eventDefinition, null);
}
LOG.debug("Creating events from MongoDB aggregation for config={} parameters={}", config, parameters);
try {
List<Bson> pipeline = buildAggregationPipeline(parameters);
AggregateIterable<Document> aggregateIterable = collection
.aggregate(pipeline, Document.class)
.batchSize(MONGO_BATCHSIZE);
Document result = aggregateIterable.first();
if (result == null) {
LOG.debug("Aggregation returned no results for timerange {} to {}",
parameters.timerange().getFrom(), parameters.timerange().getTo());
// Update state even if no results
stateService.setState(eventDefinition.id(),
parameters.timerange().getFrom(),
parameters.timerange().getTo());
return;
}
Event event = createEventFromAggregation(eventFactory, result, parameters);
Message message = convertAggregationToMessage(result, parameters);
List<EventWithContext> events = Collections.singletonList(
EventWithContext.create(event, message)
);
eventsConsumer.accept(events);
LOG.debug("Created 1 event from MongoDB aggregation result: {} {}",
events.stream().findFirst().map(e -> e.event().getEventDefinitionType()).orElse("N/A"),
result.toJson());
stateService.setState(eventDefinition.id(),
parameters.timerange().getFrom(),
parameters.timerange().getTo());
} catch (Exception e) {
final String errorMsg = String.format(Locale.ROOT,
"Failed to execute MongoDB aggregation for event definition <%s/%s>: %s",
eventDefinition.title(), eventDefinition.id(), e.getMessage());
LOG.error(errorMsg, e);
throw new EventProcessorException(errorMsg, false, eventDefinition, e);
}
}
private List<Bson> buildAggregationPipeline(MongoDBEventProcessorParameters parameters) throws EventProcessorException {
List<Bson> pipeline = new ArrayList<>();
// Add time range filter as first stage
Bson timeMatchStage = match(and(
gte(config.timestampField(), parameters.timerange().getFrom().toDate()),
lt(config.timestampField(), parameters.timerange().getTo().toDate())
));
pipeline.add(timeMatchStage);
// Parse and add user's pipeline stages
if (config.aggregationPipeline() != null && !config.aggregationPipeline().trim().isEmpty()) {
try {
final JsonNode userPipeline = objectMapper.readTree(config.aggregationPipeline());
if (!userPipeline.isArray()) {
throw new EventProcessorException("Aggregation pipeline must be a JSON array", true, eventDefinition, null);
}
for (final JsonNode stage : userPipeline) {
pipeline.add(Document.parse(objectMapper.writeValueAsString(stage)));
}
} catch (Exception e) {
LOG.error("Failed to parse aggregation pipeline: {}", config.aggregationPipeline(), e);
throw new EventProcessorException("Invalid aggregation pipeline: " + e.getMessage(), true, eventDefinition, e);
}
}
// Limit to 1 result (pipeline should naturally produce one result)
pipeline.add(limit(1));
return pipeline;
}
private Event createEventFromAggregation(EventFactory eventFactory,
Document aggregationResult,
MongoDBEventProcessorParameters parameters) {
// Use end of time range as event timestamp
DateTime timestamp = parameters.timerange().getTo();
Event event = eventFactory.createEvent(eventDefinition, timestamp, eventDefinition.title());
event.setTimerangeStart(parameters.timerange().getFrom());
event.setTimerangeEnd(parameters.timerange().getTo());
event.setOriginContext(EventOriginContext.mongodbAggregation(
collectionName,
parameters.timerange().getFrom().getMillis(),
parameters.timerange().getTo().getMillis()
));
// Add aggregation result fields to event
for (Map.Entry<String, Object> entry : aggregationResult.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
// Skip _id from $group, but include all other fields
if (!"_id".equals(key)) {
event.setField(key, FieldValue.string(String.valueOf(convertMongoValue(value))));
}
}
event.setReplayInfo(EventReplayInfo.builder()
.timerangeStart(parameters.timerange().getFrom())
.timerangeEnd(parameters.timerange().getTo())
.query(config.aggregationPipeline())
.build());
return event;
}
private Message convertAggregationToMessage(Document aggregationResult,
MongoDBEventProcessorParameters parameters) {
// Use end of time range as message timestamp
DateTime timestamp = parameters.timerange().getTo();
Message message = messageFactory.createMessage(
"MongoDB aggregation from " + collectionName + ": " + aggregationResult.toJson(),
"mongodb-event-processor",
timestamp
);
// Add aggregation result fields
Map<String, Object> fields = new HashMap<>();
fields.put("aggregation_result", aggregationResult.toJson());
fields.put("timerange_start", parameters.timerange().getFrom().toString());
fields.put("timerange_end", parameters.timerange().getTo().toString());
// Add individual aggregation fields
for (Map.Entry<String, Object> entry : aggregationResult.entrySet()) {
if (!"_id".equals(entry.getKey())) {
fields.put(entry.getKey(), convertMongoValue(entry.getValue()));
}
}
message.addFields(fields);
return message;
}
private Object convertMongoValue(Object value) {
if (value instanceof Date date) {
return date.toInstant().toString();
} else if (value instanceof org.bson.types.ObjectId) {
return value.toString();
} else if (value instanceof Number) {
return value;
} else if (value != null) {
return value.toString();
}
return null;
}
@Override
public void sourceMessagesForEvent(Event event,
Consumer<List<MessageSummary>> messageConsumer,
long limit)
throws EventProcessorException {
LOG.debug("sourceMessagesForEvent not applicable for DB aggregation event processor");
}
}

View File

@@ -1,232 +0,0 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.events.processor.mongodb;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.auto.value.AutoValue;
import com.google.common.graph.MutableGraph;
import org.graylog.events.contentpack.entities.EventProcessorConfigEntity;
import org.graylog.events.processor.EventDefinition;
import org.graylog.events.processor.EventProcessorConfig;
import org.graylog.events.processor.EventProcessorExecutionJob;
import org.graylog.events.processor.EventProcessorSchedulerConfig;
import org.graylog.scheduler.JobSchedule;
import org.graylog.scheduler.clock.JobSchedulerClock;
import org.graylog.scheduler.schedule.IntervalJobSchedule;
import org.graylog.security.UserContext;
import org.graylog2.contentpacks.EntityDescriptorIds;
import org.graylog2.contentpacks.model.entities.EntityDescriptor;
import org.graylog2.plugin.indexer.searches.timeranges.AbsoluteRange;
import org.graylog2.plugin.rest.ValidationResult;
import org.joda.time.DateTime;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@AutoValue
@JsonTypeName(MongoDBEventProcessorConfig.TYPE_NAME)
@JsonDeserialize(builder = MongoDBEventProcessorConfig.Builder.class)
public abstract class MongoDBEventProcessorConfig implements EventProcessorConfig {
public static final String TYPE_NAME = "mongodb-v1";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String FIELD_AGGREGATION_PIPELINE = "aggregation_pipeline";
private static final String FIELD_TIMESTAMP_FIELD = "timestamp_field";
private static final String FIELD_SEARCH_WITHIN_SECONDS = "search_within_seconds";
private static final String FIELD_EXECUTE_EVERY_SECONDS = "execute_every_seconds";
@JsonProperty(FIELD_AGGREGATION_PIPELINE)
public abstract String aggregationPipeline();
@JsonProperty(FIELD_TIMESTAMP_FIELD)
public abstract String timestampField();
// Ignored for START_OF_DAY mode
@JsonProperty(FIELD_SEARCH_WITHIN_SECONDS)
public abstract long searchWithinSeconds();
@JsonProperty(FIELD_EXECUTE_EVERY_SECONDS)
public abstract long executeEverySeconds();
@Override
public Set<String> requiredPermissions() {
// MongoDB event processor doesn't require specific stream permissions
return Collections.emptySet();
}
@Override
public boolean isUserPresentable() {
// This type is an internal base type not intended for direct use via the API.
// Use specific subtypes (e.g., traffic-v1) instead.
return false;
}
public static Builder builder() {
return Builder.create();
}
public abstract Builder toBuilder();
@Override
public Optional<EventProcessorSchedulerConfig> toJobSchedulerConfig(EventDefinition eventDefinition, JobSchedulerClock clock) {
return createSchedulerConfig(eventDefinition, clock, searchWithinSeconds(), executeEverySeconds());
}
public static Optional<EventProcessorSchedulerConfig> createSchedulerConfig(EventDefinition eventDefinition,
JobSchedulerClock clock,
long searchWithinSeconds,
long executeEverySeconds) {
return createSchedulerConfig(
eventDefinition, clock, searchWithinSeconds, executeEverySeconds,
true); // DB events are not generally expected to be linked to time-stamped objects, so no catch-up by default
}
/**
* Helper method to create scheduler config from time parameters with optional catch-up control.
*
* @param disableCatchup if false, enables catch-up behavior when processor falls behind; if true, disables catch-up
*/
public static Optional<EventProcessorSchedulerConfig> createSchedulerConfig(EventDefinition eventDefinition,
JobSchedulerClock clock,
long searchWithinSeconds,
long executeEverySeconds,
boolean disableCatchup) {
final DateTime now = clock.nowUTC();
// Convert seconds to milliseconds for scheduler
final long searchWithinMs = searchWithinSeconds * 1000;
final long executeEveryMs = executeEverySeconds * 1000;
// Create interval-based schedule
final JobSchedule schedule = IntervalJobSchedule.builder()
.interval(executeEveryMs)
.unit(TimeUnit.MILLISECONDS)
.build();
// Initial timerange for first execution
final AbsoluteRange timerange = AbsoluteRange.create(now.minus(searchWithinMs), now);
final EventProcessorExecutionJob.Config jobDefinitionConfig = EventProcessorExecutionJob.Config.builder()
.eventDefinitionId(eventDefinition.id())
.processingWindowSize(searchWithinMs)
.processingHopSize(executeEveryMs)
.disableCatchup(disableCatchup)
.parameters(MongoDBEventProcessorParameters.builder()
.timerange(timerange)
.build())
.build();
return Optional.of(EventProcessorSchedulerConfig.create(jobDefinitionConfig, schedule));
}
@AutoValue.Builder
public abstract static class Builder implements EventProcessorConfig.Builder<Builder> {
@JsonCreator
public static Builder create() {
return new AutoValue_MongoDBEventProcessorConfig.Builder()
.type(TYPE_NAME)
.timestampField("bucket") // Default timestamp field
.searchWithinSeconds(60) // Default: 1 minute
.executeEverySeconds(60); // Default: 1 minute
}
@JsonProperty(FIELD_AGGREGATION_PIPELINE)
public abstract Builder aggregationPipeline(String aggregationPipeline);
@JsonProperty(FIELD_TIMESTAMP_FIELD)
public abstract Builder timestampField(String timestampField);
@JsonProperty(FIELD_SEARCH_WITHIN_SECONDS)
public abstract Builder searchWithinSeconds(long searchWithinSeconds);
@JsonProperty(FIELD_EXECUTE_EVERY_SECONDS)
public abstract Builder executeEverySeconds(long executeEverySeconds);
public abstract MongoDBEventProcessorConfig build();
}
@Override
public ValidationResult validate(UserContext userContext) {
final ValidationResult validationResult = new ValidationResult();
// The mongodb-v1 type is an internal base type and must not be created directly via the API.
// Specific subtypes (e.g., traffic-v1) have their own configs and validation.
validationResult.addError("type",
"The mongodb-v1 event processor type cannot be created directly. Use a specific subtype instead.");
validateField(timestampField() != null && !timestampField().isBlank(),
FIELD_TIMESTAMP_FIELD, "Timestamp field must not be empty", validationResult);
validateField(searchWithinSeconds() > 0,
FIELD_SEARCH_WITHIN_SECONDS, "Search window must be greater than 0", validationResult);
validateField(executeEverySeconds() > 0,
FIELD_EXECUTE_EVERY_SECONDS, "Execution interval must be greater than 0", validationResult);
validateAggregationPipeline(validationResult);
return validationResult;
}
private void validateField(boolean valid, String field, String errorMsg, ValidationResult validationResult) {
if (!valid) {
validationResult.addError(field, errorMsg);
}
}
private void validateAggregationPipeline(ValidationResult validationResult) {
if (aggregationPipeline() == null || aggregationPipeline().isBlank()) {
validationResult.addError(FIELD_AGGREGATION_PIPELINE, "Aggregation pipeline is required");
return;
}
try {
final JsonNode pipeline = OBJECT_MAPPER.readTree(aggregationPipeline());
if (!pipeline.isArray()) {
validationResult.addError(FIELD_AGGREGATION_PIPELINE, "Aggregation pipeline must be a JSON array");
} else if (pipeline.isEmpty()) {
validationResult.addError(FIELD_AGGREGATION_PIPELINE, "Aggregation pipeline must have at least one stage");
} else {
// $out and $merge can only appear as top-level pipeline stages, so a shallow check is sufficient.
for (final JsonNode stage : pipeline) {
if (stage.has("$out") || stage.has("$merge")) {
validationResult.addError(FIELD_AGGREGATION_PIPELINE,
"Aggregation pipeline must not contain $out or $merge stages");
break;
}
}
}
} catch (Exception e) {
validationResult.addError(FIELD_AGGREGATION_PIPELINE,
"Invalid aggregation pipeline JSON: " + e.getMessage());
}
}
@Override
public EventProcessorConfigEntity toContentPackEntity(EntityDescriptorIds entityDescriptorIds) {
// Content pack support is not implemented for MongoDBEventProcessorConfig yet.
throw new UnsupportedOperationException("Content pack export is not supported for MongoDBEventProcessorConfig");
}
@Override
public void resolveNativeEntity(EntityDescriptor entityDescriptor, MutableGraph<EntityDescriptor> mutableGraph) {
// No external dependencies to resolve
}
}

View File

@@ -1,62 +0,0 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.events.processor.mongodb;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.auto.value.AutoValue;
import org.graylog.events.processor.EventProcessorParametersWithTimerange;
import org.graylog2.plugin.indexer.searches.timeranges.AbsoluteRange;
import org.graylog2.plugin.indexer.searches.timeranges.RelativeRange;
import org.joda.time.DateTime;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
@AutoValue
@JsonTypeName(MongoDBEventProcessorConfig.TYPE_NAME)
@JsonDeserialize(builder = MongoDBEventProcessorParameters.Builder.class)
public abstract class MongoDBEventProcessorParameters implements EventProcessorParametersWithTimerange {
@Override
public EventProcessorParametersWithTimerange withTimerange(DateTime from, DateTime to) {
requireNonNull(from, "from cannot be null");
requireNonNull(to, "to cannot be null");
checkArgument(to.isAfter(from), "to must be after from");
return toBuilder().timerange(AbsoluteRange.create(from, to)).build();
}
public abstract Builder toBuilder();
public static Builder builder() {
return Builder.create();
}
@AutoValue.Builder
public abstract static class Builder implements EventProcessorParametersWithTimerange.Builder<Builder> {
@JsonCreator
public static Builder create() {
return new AutoValue_MongoDBEventProcessorParameters.Builder()
.type(MongoDBEventProcessorConfig.TYPE_NAME)
.timerange(RelativeRange.create(3600));
}
public abstract MongoDBEventProcessorParameters build();
}
}

View File

@@ -1,115 +0,0 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.events.processor.mongodb;
import org.graylog2.plugin.rest.ValidationResult;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class MongoDBEventProcessorConfigTest {
private static final String VALID_PIPELINE = """
[{"$group": {"_id": null, "count": {"$sum": 1}}}]""";
private MongoDBEventProcessorConfig validConfig() {
return MongoDBEventProcessorConfig.builder()
.aggregationPipeline(VALID_PIPELINE)
.timestampField("bucket")
.searchWithinSeconds(60)
.executeEverySeconds(60)
.build();
}
@Test
void isUserPresentable_returnsFalse() {
final MongoDBEventProcessorConfig config = validConfig();
assertThat(config.isUserPresentable()).isFalse();
}
@Test
void validate_alwaysRejectsDirectUse() {
final MongoDBEventProcessorConfig config = validConfig();
final ValidationResult result = config.validate(null);
assertThat(result.failed()).isTrue();
assertThat(result.getErrors()).containsKey("type");
}
@Test
void validate_rejectsPipelineWithOutOperator() {
final MongoDBEventProcessorConfig config = MongoDBEventProcessorConfig.builder()
.aggregationPipeline("[{\"$out\": \"malicious_collection\"}]")
.timestampField("bucket")
.searchWithinSeconds(60)
.executeEverySeconds(60)
.build();
final ValidationResult result = config.validate(null);
assertThat(result.failed()).isTrue();
assertThat(result.getErrors()).containsKey("aggregation_pipeline");
assertThat(result.getErrors().get("aggregation_pipeline").toString())
.contains("$out or $merge");
}
@Test
void validate_rejectsPipelineWithMergeOperator() {
final MongoDBEventProcessorConfig config = MongoDBEventProcessorConfig.builder()
.aggregationPipeline("[{\"$merge\": {\"into\": \"malicious_collection\"}}]")
.timestampField("bucket")
.searchWithinSeconds(60)
.executeEverySeconds(60)
.build();
final ValidationResult result = config.validate(null);
assertThat(result.failed()).isTrue();
assertThat(result.getErrors()).containsKey("aggregation_pipeline");
assertThat(result.getErrors().get("aggregation_pipeline").toString())
.contains("$out or $merge");
}
@Test
void validate_validPipelineDoesNotProducePipelineErrors() {
final MongoDBEventProcessorConfig config = validConfig();
final ValidationResult result = config.validate(null);
// The type error is always present, but no pipeline-specific errors
assertThat(result.getErrors()).containsKey("type");
assertThat(result.getErrors()).doesNotContainKey("aggregation_pipeline");
}
@Test
void validate_rejectsEmptyPipeline() {
final MongoDBEventProcessorConfig config = MongoDBEventProcessorConfig.builder()
.aggregationPipeline("")
.timestampField("bucket")
.searchWithinSeconds(60)
.executeEverySeconds(60)
.build();
final ValidationResult result = config.validate(null);
assertThat(result.failed()).isTrue();
assertThat(result.getErrors()).containsKey("aggregation_pipeline");
}
}

View File

@@ -1,407 +0,0 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.events.processor.mongodb;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.MongoException;
import com.mongodb.client.AggregateIterable;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.graylog.events.event.Event;
import org.graylog.events.event.EventFactory;
import org.graylog.events.event.EventReplayInfo;
import org.graylog.events.event.EventWithContext;
import org.graylog.events.fields.FieldValue;
import org.graylog.events.processor.DBEventProcessorStateService;
import org.graylog.events.processor.EventConsumer;
import org.graylog.events.processor.EventDefinition;
import org.graylog.events.processor.EventProcessorException;
import org.graylog.events.processor.EventProcessorParameters;
import org.graylog2.database.MongoCollections;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.MessageFactory;
import org.graylog2.plugin.indexer.searches.timeranges.AbsoluteRange;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class MongoDBEventProcessorTest {
private static final String COLLECTION_NAME = "test_collection";
private static final String VALID_PIPELINE = """
[{"$group": {"_id": null, "count": {"$sum": 1}}}]""";
@Mock
private MongoCollections mongoCollections;
@Mock
private DBEventProcessorStateService stateService;
@Mock
private MessageFactory messageFactory;
@Mock
private EventDefinition eventDefinition;
@Mock
private EventFactory eventFactory;
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* Minimal concrete subclass of MongoDBEventProcessor for testing the base class behavior.
* Does not override createEvents — all events pass through unfiltered.
*/
static class TestableMongoDBEventProcessor extends MongoDBEventProcessor {
TestableMongoDBEventProcessor(EventDefinition eventDefinition,
MongoDBEventProcessorConfig config,
String collectionName,
MongoCollections mongoCollections,
DBEventProcessorStateService stateService,
MessageFactory messageFactory,
ObjectMapper objectMapper) {
super(eventDefinition, config, collectionName, mongoCollections, stateService, messageFactory, objectMapper);
}
}
@BeforeEach
void setUp() {
lenient().when(eventDefinition.id()).thenReturn("test-event-def-id");
lenient().when(eventDefinition.title()).thenReturn("Test Event");
}
private TestableMongoDBEventProcessor createProcessor(MongoDBEventProcessorConfig config) {
return new TestableMongoDBEventProcessor(eventDefinition, config, COLLECTION_NAME,
mongoCollections, stateService, messageFactory, objectMapper);
}
private MongoDBEventProcessorConfig createConfig() {
return createConfig(VALID_PIPELINE);
}
private MongoDBEventProcessorConfig createConfig(String pipeline) {
return MongoDBEventProcessorConfig.builder()
.aggregationPipeline(pipeline)
.timestampField("bucket")
.searchWithinSeconds(60)
.executeEverySeconds(60)
.build();
}
private MongoDBEventProcessorParameters createParams(DateTime from, DateTime to) {
return MongoDBEventProcessorParameters.builder()
.timerange(AbsoluteRange.create(from, to))
.build();
}
/**
* Creates a mock Event that tracks setField/getField calls via a backing HashMap,
* so that field values set by the processor can be inspected in assertions.
*/
private Event createFieldTrackingEvent() {
final Map<String, FieldValue> fields = new HashMap<>();
final Event event = mock(Event.class);
doAnswer(inv -> {
fields.put(inv.getArgument(0), inv.getArgument(1));
return null;
}).when(event).setField(anyString(), any(FieldValue.class));
lenient().when(event.getField(anyString())).thenAnswer(inv -> fields.get(inv.getArgument(0)));
lenient().when(event.getMessage()).thenReturn("Test Event");
return event;
}
@SuppressWarnings("unchecked")
private void setupAggregation(Document result) {
final com.mongodb.client.MongoCollection<Document> rawCollection = mock(com.mongodb.client.MongoCollection.class);
final AggregateIterable<Document> aggregateIterable = mock(AggregateIterable.class);
when(mongoCollections.nonEntityCollection(eq(COLLECTION_NAME), eq(Document.class))).thenReturn(rawCollection);
when(rawCollection.aggregate(any(List.class), eq(Document.class))).thenReturn(aggregateIterable);
when(aggregateIterable.batchSize(anyInt())).thenReturn(aggregateIterable);
when(aggregateIterable.first()).thenReturn(result);
}
@SuppressWarnings("unchecked")
private void setupAggregationThrows(RuntimeException exception) {
final com.mongodb.client.MongoCollection<Document> rawCollection = mock(com.mongodb.client.MongoCollection.class);
final AggregateIterable<Document> aggregateIterable = mock(AggregateIterable.class);
when(mongoCollections.nonEntityCollection(eq(COLLECTION_NAME), eq(Document.class))).thenReturn(rawCollection);
when(rawCollection.aggregate(any(List.class), eq(Document.class))).thenReturn(aggregateIterable);
when(aggregateIterable.batchSize(anyInt())).thenReturn(aggregateIterable);
when(aggregateIterable.first()).thenThrow(exception);
}
// ==================== Invalid Parameters ====================
@Test
void createEvents_invalidParameterType_throwsException() {
final var processor = createProcessor(createConfig());
final EventProcessorParameters invalidParams = mock(EventProcessorParameters.class);
@SuppressWarnings("unchecked")
final EventConsumer<List<EventWithContext>> consumer = mock(EventConsumer.class);
assertThatThrownBy(() -> processor.createEvents(eventFactory, invalidParams, consumer))
.isInstanceOf(EventProcessorException.class)
.hasMessageContaining("Invalid parameters type")
.hasMessageContaining("MongoDBEventProcessorParameters");
}
// ==================== Event Field Population ====================
@SuppressWarnings("unchecked")
@Test
void createEvents_setsEventFieldsFromAggregation() throws Exception {
final Document result = new Document()
.append("_id", null)
.append("count", 42)
.append("total_bytes", 1024L);
setupAggregation(result);
final Event event = createFieldTrackingEvent();
when(eventFactory.createEvent(any(), any(), any())).thenReturn(event);
when(messageFactory.createMessage(anyString(), anyString(), any(DateTime.class))).thenReturn(mock(Message.class));
final var processor = createProcessor(createConfig());
final EventConsumer<List<EventWithContext>> consumer = mock(EventConsumer.class);
final DateTime from = DateTime.now(DateTimeZone.UTC).minusHours(1);
final DateTime to = DateTime.now(DateTimeZone.UTC);
processor.createEvents(eventFactory, createParams(from, to), consumer);
// Aggregation fields should be set on the event (as string values)
assertThat(event.getField("count")).isNotNull();
assertThat(event.getField("count").value()).isEqualTo("42");
assertThat(event.getField("total_bytes")).isNotNull();
assertThat(event.getField("total_bytes").value()).isEqualTo("1024");
// _id should be skipped
assertThat(event.getField("_id")).isNull();
}
// ==================== Event Metadata ====================
@SuppressWarnings("unchecked")
@Test
void createEvents_setsEventMetadata() throws Exception {
final Document result = new Document("count", 1);
setupAggregation(result);
final Event event = createFieldTrackingEvent();
when(eventFactory.createEvent(any(), any(), any())).thenReturn(event);
when(messageFactory.createMessage(anyString(), anyString(), any(DateTime.class))).thenReturn(mock(Message.class));
final var processor = createProcessor(createConfig());
final EventConsumer<List<EventWithContext>> consumer = mock(EventConsumer.class);
final DateTime from = new DateTime(2026, 3, 5, 0, 0, DateTimeZone.UTC);
final DateTime to = new DateTime(2026, 3, 5, 12, 0, DateTimeZone.UTC);
processor.createEvents(eventFactory, createParams(from, to), consumer);
// Event should be created with the end-of-timerange timestamp and event definition title
verify(eventFactory).createEvent(eq(eventDefinition), eq(to), eq("Test Event"));
// Timerange should be set
verify(event).setTimerangeStart(from);
verify(event).setTimerangeEnd(to);
// Origin context should reference the collection and time range
final ArgumentCaptor<String> originCaptor = ArgumentCaptor.forClass(String.class);
verify(event).setOriginContext(originCaptor.capture());
assertThat(originCaptor.getValue())
.contains(COLLECTION_NAME)
.contains(String.valueOf(from.getMillis()))
.contains(String.valueOf(to.getMillis()));
// Replay info should be set with timerange and pipeline query
final ArgumentCaptor<EventReplayInfo> replayCaptor = ArgumentCaptor.forClass(EventReplayInfo.class);
verify(event).setReplayInfo(replayCaptor.capture());
final EventReplayInfo replayInfo = replayCaptor.getValue();
assertThat(replayInfo.timerangeStart()).isEqualTo(from);
assertThat(replayInfo.timerangeEnd()).isEqualTo(to);
assertThat(replayInfo.query()).isEqualTo(VALID_PIPELINE);
}
// ==================== Message Creation ====================
@SuppressWarnings("unchecked")
@Test
void createEvents_createsMessageWithAggregationResult() throws Exception {
final Document result = new Document()
.append("_id", null)
.append("count", 42);
setupAggregation(result);
final Event event = createFieldTrackingEvent();
when(eventFactory.createEvent(any(), any(), any())).thenReturn(event);
final Message message = mock(Message.class);
when(messageFactory.createMessage(anyString(), anyString(), any(DateTime.class))).thenReturn(message);
final var processor = createProcessor(createConfig());
final EventConsumer<List<EventWithContext>> consumer = mock(EventConsumer.class);
final DateTime from = new DateTime(2026, 3, 5, 0, 0, DateTimeZone.UTC);
final DateTime to = new DateTime(2026, 3, 5, 12, 0, DateTimeZone.UTC);
processor.createEvents(eventFactory, createParams(from, to), consumer);
// Message should be created with collection name in text, at the end-of-timerange timestamp
final ArgumentCaptor<String> msgTextCaptor = ArgumentCaptor.forClass(String.class);
verify(messageFactory).createMessage(msgTextCaptor.capture(), eq("mongodb-event-processor"), eq(to));
assertThat(msgTextCaptor.getValue()).contains(COLLECTION_NAME);
// Message should have aggregation fields added
@SuppressWarnings("unchecked")
final ArgumentCaptor<Map<String, Object>> fieldsCaptor = ArgumentCaptor.forClass(Map.class);
verify(message).addFields(fieldsCaptor.capture());
final Map<String, Object> messageFields = fieldsCaptor.getValue();
assertThat(messageFields).containsKey("aggregation_result");
assertThat(messageFields).containsKey("timerange_start");
assertThat(messageFields).containsKey("timerange_end");
assertThat(messageFields).containsEntry("count", 42);
// _id should be skipped in message fields too
assertThat(messageFields).doesNotContainKey("_id");
}
// ==================== State Service ====================
@SuppressWarnings("unchecked")
@Test
void createEvents_updatesStateOnResult() throws Exception {
final Document result = new Document("count", 1);
setupAggregation(result);
final Event event = createFieldTrackingEvent();
when(eventFactory.createEvent(any(), any(), any())).thenReturn(event);
when(messageFactory.createMessage(anyString(), anyString(), any(DateTime.class))).thenReturn(mock(Message.class));
final var processor = createProcessor(createConfig());
final EventConsumer<List<EventWithContext>> consumer = mock(EventConsumer.class);
final DateTime from = new DateTime(2026, 3, 5, 0, 0, DateTimeZone.UTC);
final DateTime to = new DateTime(2026, 3, 5, 12, 0, DateTimeZone.UTC);
processor.createEvents(eventFactory, createParams(from, to), consumer);
verify(stateService).setState("test-event-def-id", from, to);
}
@SuppressWarnings("unchecked")
@Test
void createEvents_updatesStateOnNoResult() throws Exception {
setupAggregation(null);
final var processor = createProcessor(createConfig());
final EventConsumer<List<EventWithContext>> consumer = mock(EventConsumer.class);
final DateTime from = new DateTime(2026, 3, 5, 0, 0, DateTimeZone.UTC);
final DateTime to = new DateTime(2026, 3, 5, 12, 0, DateTimeZone.UTC);
processor.createEvents(eventFactory, createParams(from, to), consumer);
// State should still be updated even with no results
verify(stateService).setState("test-event-def-id", from, to);
// Consumer should NOT be called
verify(consumer, never()).accept(any());
}
// ==================== Exception Handling ====================
@SuppressWarnings("unchecked")
@Test
void createEvents_wrapsAggregationException() {
setupAggregationThrows(new MongoException("Connection lost"));
final var processor = createProcessor(createConfig());
final EventConsumer<List<EventWithContext>> consumer = mock(EventConsumer.class);
final DateTime from = DateTime.now(DateTimeZone.UTC).minusHours(1);
final DateTime to = DateTime.now(DateTimeZone.UTC);
assertThatThrownBy(() -> processor.createEvents(eventFactory, createParams(from, to), consumer))
.isInstanceOf(EventProcessorException.class)
.hasMessageContaining("Failed to execute MongoDB aggregation")
.hasMessageContaining("Test Event")
.hasCauseInstanceOf(MongoException.class);
}
// ==================== Value Conversion ====================
@SuppressWarnings("unchecked")
@Test
void createEvents_convertsMongoValueTypes() throws Exception {
final Date javaDate = new Date(1709596800000L); // 2024-03-05T00:00:00Z
final ObjectId objectId = new ObjectId("507f1f77bcf86cd799439011");
final Document result = new Document()
.append("date_field", javaDate)
.append("objectid_field", objectId)
.append("number_field", 12345L)
.append("string_field", "hello")
.append("null_field", null);
setupAggregation(result);
final Event event = createFieldTrackingEvent();
when(eventFactory.createEvent(any(), any(), any())).thenReturn(event);
when(messageFactory.createMessage(anyString(), anyString(), any(DateTime.class))).thenReturn(mock(Message.class));
final var processor = createProcessor(createConfig());
final EventConsumer<List<EventWithContext>> consumer = mock(EventConsumer.class);
final DateTime from = DateTime.now(DateTimeZone.UTC).minusHours(1);
final DateTime to = DateTime.now(DateTimeZone.UTC);
processor.createEvents(eventFactory, createParams(from, to), consumer);
// Date should be converted to a DateTime string representation
assertThat(event.getField("date_field")).isNotNull();
assertThat(event.getField("date_field").value()).contains("2024-03-05");
// ObjectId should be converted to its string representation
assertThat(event.getField("objectid_field")).isNotNull();
assertThat(event.getField("objectid_field").value()).isEqualTo("507f1f77bcf86cd799439011");
// Numbers should be preserved (as string via FieldValue.string())
assertThat(event.getField("number_field")).isNotNull();
assertThat(event.getField("number_field").value()).isEqualTo("12345");
// Strings should be preserved
assertThat(event.getField("string_field")).isNotNull();
assertThat(event.getField("string_field").value()).isEqualTo("hello");
// Null values should result in "null" string (String.valueOf(null))
assertThat(event.getField("null_field")).isNotNull();
assertThat(event.getField("null_field").value()).isEqualTo("null");
}
}