From c62a1c4b113f5a06762add74a6ae4dce1a1f3746 Mon Sep 17 00:00:00 2001 From: Patrick Mann Date: Wed, 11 Mar 2026 10:46:22 +0100 Subject: [PATCH] Eliminate MongoDBEventProcessor (#25277) * eliminate MongoDBEventProcessor * revert unneeded rename --- .../mongodb/MongoDBEventProcessor.java | 268 ------------ .../mongodb/MongoDBEventProcessorConfig.java | 232 ---------- .../MongoDBEventProcessorParameters.java | 62 --- .../MongoDBEventProcessorConfigTest.java | 115 ----- .../mongodb/MongoDBEventProcessorTest.java | 407 ------------------ 5 files changed, 1084 deletions(-) delete mode 100644 graylog2-server/src/main/java/org/graylog/events/processor/mongodb/MongoDBEventProcessor.java delete mode 100644 graylog2-server/src/main/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorConfig.java delete mode 100644 graylog2-server/src/main/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorParameters.java delete mode 100644 graylog2-server/src/test/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorConfigTest.java delete mode 100644 graylog2-server/src/test/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorTest.java diff --git a/graylog2-server/src/main/java/org/graylog/events/processor/mongodb/MongoDBEventProcessor.java b/graylog2-server/src/main/java/org/graylog/events/processor/mongodb/MongoDBEventProcessor.java deleted file mode 100644 index 2fdfce358a..0000000000 --- a/graylog2-server/src/main/java/org/graylog/events/processor/mongodb/MongoDBEventProcessor.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Copyright (C) 2020 Graylog, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - */ -package org.graylog.events.processor.mongodb; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.mongodb.client.AggregateIterable; -import com.mongodb.client.MongoCollection; -import org.bson.Document; -import org.bson.conversions.Bson; -import org.graylog.events.event.Event; -import org.graylog.events.event.EventFactory; -import org.graylog.events.event.EventOriginContext; -import org.graylog.events.event.EventReplayInfo; -import org.graylog.events.event.EventWithContext; -import org.graylog.events.fields.FieldValue; -import org.graylog.events.processor.DBEventProcessorStateService; -import org.graylog.events.processor.EventConsumer; -import org.graylog.events.processor.EventDefinition; -import org.graylog.events.processor.EventProcessor; -import org.graylog.events.processor.EventProcessorException; -import org.graylog.events.processor.EventProcessorParameters; -import org.graylog2.database.MongoCollections; -import org.graylog2.plugin.Message; -import org.graylog2.plugin.MessageFactory; -import org.graylog2.plugin.MessageSummary; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.function.Consumer; - -import static com.mongodb.client.model.Aggregates.limit; -import static com.mongodb.client.model.Aggregates.match; -import static com.mongodb.client.model.Filters.and; -import static com.mongodb.client.model.Filters.gte; -import static com.mongodb.client.model.Filters.lt; - -/** - * EventProcessor implementation that executes a MongoDB aggregation pipeline to create events based on the aggregation results. - * Events are intended to alert to specific DB conditions - the pipeline is expected to produce a single document result, - * which is then converted into an Event. - */ -public abstract class MongoDBEventProcessor implements EventProcessor { - - private static final Logger LOG = LoggerFactory.getLogger(MongoDBEventProcessor.class); - - private static final int MONGO_BATCHSIZE = 200; - - private final EventDefinition eventDefinition; - private final MongoDBEventProcessorConfig config; - private final String collectionName; - private final MongoCollection collection; - private final DBEventProcessorStateService stateService; - private final MessageFactory messageFactory; - private final ObjectMapper objectMapper; - - // The collectionName is provided explicitly by the subclass rather than read from config, - // so the processor only accesses collections the subclass intends. - protected MongoDBEventProcessor(EventDefinition eventDefinition, - MongoDBEventProcessorConfig config, - String collectionName, - MongoCollections mongoCollections, - DBEventProcessorStateService stateService, - MessageFactory messageFactory, - ObjectMapper objectMapper) { - this.eventDefinition = eventDefinition; - this.config = config; - this.collectionName = collectionName; - this.collection = mongoCollections.nonEntityCollection(collectionName, Document.class); - this.stateService = stateService; - this.messageFactory = messageFactory; - this.objectMapper = objectMapper; - } - - @Override - public void createEvents(EventFactory eventFactory, - EventProcessorParameters processorParameters, - EventConsumer> eventsConsumer) - throws EventProcessorException { - if (!(processorParameters instanceof final MongoDBEventProcessorParameters parameters)) { - final String message = String.format(Locale.ROOT, - "Invalid parameters type for MongoDBEventProcessor. Expected <%s> but got <%s> for event definition <%s/%s>.", - MongoDBEventProcessorParameters.class.getSimpleName(), - processorParameters != null ? processorParameters.getClass().getName() : "null", - eventDefinition.title(), - eventDefinition.id()); - throw new EventProcessorException(message, false, eventDefinition, null); - } - LOG.debug("Creating events from MongoDB aggregation for config={} parameters={}", config, parameters); - - try { - List pipeline = buildAggregationPipeline(parameters); - AggregateIterable aggregateIterable = collection - .aggregate(pipeline, Document.class) - .batchSize(MONGO_BATCHSIZE); - Document result = aggregateIterable.first(); - - if (result == null) { - LOG.debug("Aggregation returned no results for timerange {} to {}", - parameters.timerange().getFrom(), parameters.timerange().getTo()); - // Update state even if no results - stateService.setState(eventDefinition.id(), - parameters.timerange().getFrom(), - parameters.timerange().getTo()); - return; - } - - Event event = createEventFromAggregation(eventFactory, result, parameters); - Message message = convertAggregationToMessage(result, parameters); - List events = Collections.singletonList( - EventWithContext.create(event, message) - ); - eventsConsumer.accept(events); - - LOG.debug("Created 1 event from MongoDB aggregation result: {} {}", - events.stream().findFirst().map(e -> e.event().getEventDefinitionType()).orElse("N/A"), - result.toJson()); - - stateService.setState(eventDefinition.id(), - parameters.timerange().getFrom(), - parameters.timerange().getTo()); - - } catch (Exception e) { - final String errorMsg = String.format(Locale.ROOT, - "Failed to execute MongoDB aggregation for event definition <%s/%s>: %s", - eventDefinition.title(), eventDefinition.id(), e.getMessage()); - LOG.error(errorMsg, e); - throw new EventProcessorException(errorMsg, false, eventDefinition, e); - } - } - - private List buildAggregationPipeline(MongoDBEventProcessorParameters parameters) throws EventProcessorException { - List pipeline = new ArrayList<>(); - - // Add time range filter as first stage - Bson timeMatchStage = match(and( - gte(config.timestampField(), parameters.timerange().getFrom().toDate()), - lt(config.timestampField(), parameters.timerange().getTo().toDate()) - )); - pipeline.add(timeMatchStage); - - // Parse and add user's pipeline stages - if (config.aggregationPipeline() != null && !config.aggregationPipeline().trim().isEmpty()) { - try { - final JsonNode userPipeline = objectMapper.readTree(config.aggregationPipeline()); - if (!userPipeline.isArray()) { - throw new EventProcessorException("Aggregation pipeline must be a JSON array", true, eventDefinition, null); - } - for (final JsonNode stage : userPipeline) { - pipeline.add(Document.parse(objectMapper.writeValueAsString(stage))); - } - } catch (Exception e) { - LOG.error("Failed to parse aggregation pipeline: {}", config.aggregationPipeline(), e); - throw new EventProcessorException("Invalid aggregation pipeline: " + e.getMessage(), true, eventDefinition, e); - } - } - - // Limit to 1 result (pipeline should naturally produce one result) - pipeline.add(limit(1)); - - return pipeline; - } - - private Event createEventFromAggregation(EventFactory eventFactory, - Document aggregationResult, - MongoDBEventProcessorParameters parameters) { - // Use end of time range as event timestamp - DateTime timestamp = parameters.timerange().getTo(); - Event event = eventFactory.createEvent(eventDefinition, timestamp, eventDefinition.title()); - event.setTimerangeStart(parameters.timerange().getFrom()); - event.setTimerangeEnd(parameters.timerange().getTo()); - - event.setOriginContext(EventOriginContext.mongodbAggregation( - collectionName, - parameters.timerange().getFrom().getMillis(), - parameters.timerange().getTo().getMillis() - )); - - // Add aggregation result fields to event - for (Map.Entry entry : aggregationResult.entrySet()) { - String key = entry.getKey(); - Object value = entry.getValue(); - // Skip _id from $group, but include all other fields - if (!"_id".equals(key)) { - event.setField(key, FieldValue.string(String.valueOf(convertMongoValue(value)))); - } - } - - event.setReplayInfo(EventReplayInfo.builder() - .timerangeStart(parameters.timerange().getFrom()) - .timerangeEnd(parameters.timerange().getTo()) - .query(config.aggregationPipeline()) - .build()); - - return event; - } - - private Message convertAggregationToMessage(Document aggregationResult, - MongoDBEventProcessorParameters parameters) { - // Use end of time range as message timestamp - DateTime timestamp = parameters.timerange().getTo(); - - Message message = messageFactory.createMessage( - "MongoDB aggregation from " + collectionName + ": " + aggregationResult.toJson(), - "mongodb-event-processor", - timestamp - ); - - // Add aggregation result fields - Map fields = new HashMap<>(); - fields.put("aggregation_result", aggregationResult.toJson()); - fields.put("timerange_start", parameters.timerange().getFrom().toString()); - fields.put("timerange_end", parameters.timerange().getTo().toString()); - - // Add individual aggregation fields - for (Map.Entry entry : aggregationResult.entrySet()) { - if (!"_id".equals(entry.getKey())) { - fields.put(entry.getKey(), convertMongoValue(entry.getValue())); - } - } - - message.addFields(fields); - return message; - } - - private Object convertMongoValue(Object value) { - if (value instanceof Date date) { - return date.toInstant().toString(); - } else if (value instanceof org.bson.types.ObjectId) { - return value.toString(); - } else if (value instanceof Number) { - return value; - } else if (value != null) { - return value.toString(); - } - return null; - } - - @Override - public void sourceMessagesForEvent(Event event, - Consumer> messageConsumer, - long limit) - throws EventProcessorException { - LOG.debug("sourceMessagesForEvent not applicable for DB aggregation event processor"); - } -} diff --git a/graylog2-server/src/main/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorConfig.java b/graylog2-server/src/main/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorConfig.java deleted file mode 100644 index b3a93b8d75..0000000000 --- a/graylog2-server/src/main/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorConfig.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Copyright (C) 2020 Graylog, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - */ -package org.graylog.events.processor.mongodb; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.google.auto.value.AutoValue; -import com.google.common.graph.MutableGraph; -import org.graylog.events.contentpack.entities.EventProcessorConfigEntity; -import org.graylog.events.processor.EventDefinition; -import org.graylog.events.processor.EventProcessorConfig; -import org.graylog.events.processor.EventProcessorExecutionJob; -import org.graylog.events.processor.EventProcessorSchedulerConfig; -import org.graylog.scheduler.JobSchedule; -import org.graylog.scheduler.clock.JobSchedulerClock; -import org.graylog.scheduler.schedule.IntervalJobSchedule; -import org.graylog.security.UserContext; -import org.graylog2.contentpacks.EntityDescriptorIds; -import org.graylog2.contentpacks.model.entities.EntityDescriptor; -import org.graylog2.plugin.indexer.searches.timeranges.AbsoluteRange; -import org.graylog2.plugin.rest.ValidationResult; -import org.joda.time.DateTime; - -import java.util.Collections; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -@AutoValue -@JsonTypeName(MongoDBEventProcessorConfig.TYPE_NAME) -@JsonDeserialize(builder = MongoDBEventProcessorConfig.Builder.class) -public abstract class MongoDBEventProcessorConfig implements EventProcessorConfig { - public static final String TYPE_NAME = "mongodb-v1"; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private static final String FIELD_AGGREGATION_PIPELINE = "aggregation_pipeline"; - private static final String FIELD_TIMESTAMP_FIELD = "timestamp_field"; - private static final String FIELD_SEARCH_WITHIN_SECONDS = "search_within_seconds"; - private static final String FIELD_EXECUTE_EVERY_SECONDS = "execute_every_seconds"; - - @JsonProperty(FIELD_AGGREGATION_PIPELINE) - public abstract String aggregationPipeline(); - - @JsonProperty(FIELD_TIMESTAMP_FIELD) - public abstract String timestampField(); - - // Ignored for START_OF_DAY mode - @JsonProperty(FIELD_SEARCH_WITHIN_SECONDS) - public abstract long searchWithinSeconds(); - - @JsonProperty(FIELD_EXECUTE_EVERY_SECONDS) - public abstract long executeEverySeconds(); - - @Override - public Set requiredPermissions() { - // MongoDB event processor doesn't require specific stream permissions - return Collections.emptySet(); - } - - @Override - public boolean isUserPresentable() { - // This type is an internal base type not intended for direct use via the API. - // Use specific subtypes (e.g., traffic-v1) instead. - return false; - } - - public static Builder builder() { - return Builder.create(); - } - - public abstract Builder toBuilder(); - - @Override - public Optional toJobSchedulerConfig(EventDefinition eventDefinition, JobSchedulerClock clock) { - return createSchedulerConfig(eventDefinition, clock, searchWithinSeconds(), executeEverySeconds()); - } - - public static Optional createSchedulerConfig(EventDefinition eventDefinition, - JobSchedulerClock clock, - long searchWithinSeconds, - long executeEverySeconds) { - return createSchedulerConfig( - eventDefinition, clock, searchWithinSeconds, executeEverySeconds, - true); // DB events are not generally expected to be linked to time-stamped objects, so no catch-up by default - } - - /** - * Helper method to create scheduler config from time parameters with optional catch-up control. - * - * @param disableCatchup if false, enables catch-up behavior when processor falls behind; if true, disables catch-up - */ - public static Optional createSchedulerConfig(EventDefinition eventDefinition, - JobSchedulerClock clock, - long searchWithinSeconds, - long executeEverySeconds, - boolean disableCatchup) { - final DateTime now = clock.nowUTC(); - - // Convert seconds to milliseconds for scheduler - final long searchWithinMs = searchWithinSeconds * 1000; - final long executeEveryMs = executeEverySeconds * 1000; - - // Create interval-based schedule - final JobSchedule schedule = IntervalJobSchedule.builder() - .interval(executeEveryMs) - .unit(TimeUnit.MILLISECONDS) - .build(); - - // Initial timerange for first execution - final AbsoluteRange timerange = AbsoluteRange.create(now.minus(searchWithinMs), now); - - final EventProcessorExecutionJob.Config jobDefinitionConfig = EventProcessorExecutionJob.Config.builder() - .eventDefinitionId(eventDefinition.id()) - .processingWindowSize(searchWithinMs) - .processingHopSize(executeEveryMs) - .disableCatchup(disableCatchup) - .parameters(MongoDBEventProcessorParameters.builder() - .timerange(timerange) - .build()) - .build(); - - return Optional.of(EventProcessorSchedulerConfig.create(jobDefinitionConfig, schedule)); - } - - @AutoValue.Builder - public abstract static class Builder implements EventProcessorConfig.Builder { - @JsonCreator - public static Builder create() { - return new AutoValue_MongoDBEventProcessorConfig.Builder() - .type(TYPE_NAME) - .timestampField("bucket") // Default timestamp field - .searchWithinSeconds(60) // Default: 1 minute - .executeEverySeconds(60); // Default: 1 minute - } - - @JsonProperty(FIELD_AGGREGATION_PIPELINE) - public abstract Builder aggregationPipeline(String aggregationPipeline); - - @JsonProperty(FIELD_TIMESTAMP_FIELD) - public abstract Builder timestampField(String timestampField); - - @JsonProperty(FIELD_SEARCH_WITHIN_SECONDS) - public abstract Builder searchWithinSeconds(long searchWithinSeconds); - - @JsonProperty(FIELD_EXECUTE_EVERY_SECONDS) - public abstract Builder executeEverySeconds(long executeEverySeconds); - - public abstract MongoDBEventProcessorConfig build(); - } - - @Override - public ValidationResult validate(UserContext userContext) { - final ValidationResult validationResult = new ValidationResult(); - - // The mongodb-v1 type is an internal base type and must not be created directly via the API. - // Specific subtypes (e.g., traffic-v1) have their own configs and validation. - validationResult.addError("type", - "The mongodb-v1 event processor type cannot be created directly. Use a specific subtype instead."); - - validateField(timestampField() != null && !timestampField().isBlank(), - FIELD_TIMESTAMP_FIELD, "Timestamp field must not be empty", validationResult); - validateField(searchWithinSeconds() > 0, - FIELD_SEARCH_WITHIN_SECONDS, "Search window must be greater than 0", validationResult); - validateField(executeEverySeconds() > 0, - FIELD_EXECUTE_EVERY_SECONDS, "Execution interval must be greater than 0", validationResult); - validateAggregationPipeline(validationResult); - - return validationResult; - } - - private void validateField(boolean valid, String field, String errorMsg, ValidationResult validationResult) { - if (!valid) { - validationResult.addError(field, errorMsg); - } - } - - private void validateAggregationPipeline(ValidationResult validationResult) { - if (aggregationPipeline() == null || aggregationPipeline().isBlank()) { - validationResult.addError(FIELD_AGGREGATION_PIPELINE, "Aggregation pipeline is required"); - return; - } - try { - final JsonNode pipeline = OBJECT_MAPPER.readTree(aggregationPipeline()); - if (!pipeline.isArray()) { - validationResult.addError(FIELD_AGGREGATION_PIPELINE, "Aggregation pipeline must be a JSON array"); - } else if (pipeline.isEmpty()) { - validationResult.addError(FIELD_AGGREGATION_PIPELINE, "Aggregation pipeline must have at least one stage"); - } else { - // $out and $merge can only appear as top-level pipeline stages, so a shallow check is sufficient. - for (final JsonNode stage : pipeline) { - if (stage.has("$out") || stage.has("$merge")) { - validationResult.addError(FIELD_AGGREGATION_PIPELINE, - "Aggregation pipeline must not contain $out or $merge stages"); - break; - } - } - } - } catch (Exception e) { - validationResult.addError(FIELD_AGGREGATION_PIPELINE, - "Invalid aggregation pipeline JSON: " + e.getMessage()); - } - } - - @Override - public EventProcessorConfigEntity toContentPackEntity(EntityDescriptorIds entityDescriptorIds) { - // Content pack support is not implemented for MongoDBEventProcessorConfig yet. - throw new UnsupportedOperationException("Content pack export is not supported for MongoDBEventProcessorConfig"); - } - - @Override - public void resolveNativeEntity(EntityDescriptor entityDescriptor, MutableGraph mutableGraph) { - // No external dependencies to resolve - } -} diff --git a/graylog2-server/src/main/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorParameters.java b/graylog2-server/src/main/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorParameters.java deleted file mode 100644 index 26050a476e..0000000000 --- a/graylog2-server/src/main/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorParameters.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (C) 2020 Graylog, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - */ -package org.graylog.events.processor.mongodb; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.google.auto.value.AutoValue; -import org.graylog.events.processor.EventProcessorParametersWithTimerange; -import org.graylog2.plugin.indexer.searches.timeranges.AbsoluteRange; -import org.graylog2.plugin.indexer.searches.timeranges.RelativeRange; -import org.joda.time.DateTime; - -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; - -@AutoValue -@JsonTypeName(MongoDBEventProcessorConfig.TYPE_NAME) -@JsonDeserialize(builder = MongoDBEventProcessorParameters.Builder.class) -public abstract class MongoDBEventProcessorParameters implements EventProcessorParametersWithTimerange { - - @Override - public EventProcessorParametersWithTimerange withTimerange(DateTime from, DateTime to) { - requireNonNull(from, "from cannot be null"); - requireNonNull(to, "to cannot be null"); - checkArgument(to.isAfter(from), "to must be after from"); - - return toBuilder().timerange(AbsoluteRange.create(from, to)).build(); - } - - public abstract Builder toBuilder(); - - public static Builder builder() { - return Builder.create(); - } - - @AutoValue.Builder - public abstract static class Builder implements EventProcessorParametersWithTimerange.Builder { - @JsonCreator - public static Builder create() { - return new AutoValue_MongoDBEventProcessorParameters.Builder() - .type(MongoDBEventProcessorConfig.TYPE_NAME) - .timerange(RelativeRange.create(3600)); - } - - public abstract MongoDBEventProcessorParameters build(); - } -} diff --git a/graylog2-server/src/test/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorConfigTest.java b/graylog2-server/src/test/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorConfigTest.java deleted file mode 100644 index 5660a2395c..0000000000 --- a/graylog2-server/src/test/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorConfigTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (C) 2020 Graylog, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - */ -package org.graylog.events.processor.mongodb; - -import org.graylog2.plugin.rest.ValidationResult; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -class MongoDBEventProcessorConfigTest { - - private static final String VALID_PIPELINE = """ - [{"$group": {"_id": null, "count": {"$sum": 1}}}]"""; - - private MongoDBEventProcessorConfig validConfig() { - return MongoDBEventProcessorConfig.builder() - - .aggregationPipeline(VALID_PIPELINE) - .timestampField("bucket") - .searchWithinSeconds(60) - .executeEverySeconds(60) - .build(); - } - - @Test - void isUserPresentable_returnsFalse() { - final MongoDBEventProcessorConfig config = validConfig(); - assertThat(config.isUserPresentable()).isFalse(); - } - - @Test - void validate_alwaysRejectsDirectUse() { - final MongoDBEventProcessorConfig config = validConfig(); - final ValidationResult result = config.validate(null); - - assertThat(result.failed()).isTrue(); - assertThat(result.getErrors()).containsKey("type"); - } - - @Test - void validate_rejectsPipelineWithOutOperator() { - final MongoDBEventProcessorConfig config = MongoDBEventProcessorConfig.builder() - - .aggregationPipeline("[{\"$out\": \"malicious_collection\"}]") - .timestampField("bucket") - .searchWithinSeconds(60) - .executeEverySeconds(60) - .build(); - - final ValidationResult result = config.validate(null); - - assertThat(result.failed()).isTrue(); - assertThat(result.getErrors()).containsKey("aggregation_pipeline"); - assertThat(result.getErrors().get("aggregation_pipeline").toString()) - .contains("$out or $merge"); - } - - @Test - void validate_rejectsPipelineWithMergeOperator() { - final MongoDBEventProcessorConfig config = MongoDBEventProcessorConfig.builder() - - .aggregationPipeline("[{\"$merge\": {\"into\": \"malicious_collection\"}}]") - .timestampField("bucket") - .searchWithinSeconds(60) - .executeEverySeconds(60) - .build(); - - final ValidationResult result = config.validate(null); - - assertThat(result.failed()).isTrue(); - assertThat(result.getErrors()).containsKey("aggregation_pipeline"); - assertThat(result.getErrors().get("aggregation_pipeline").toString()) - .contains("$out or $merge"); - } - - @Test - void validate_validPipelineDoesNotProducePipelineErrors() { - final MongoDBEventProcessorConfig config = validConfig(); - final ValidationResult result = config.validate(null); - - // The type error is always present, but no pipeline-specific errors - assertThat(result.getErrors()).containsKey("type"); - assertThat(result.getErrors()).doesNotContainKey("aggregation_pipeline"); - } - - @Test - void validate_rejectsEmptyPipeline() { - final MongoDBEventProcessorConfig config = MongoDBEventProcessorConfig.builder() - - .aggregationPipeline("") - .timestampField("bucket") - .searchWithinSeconds(60) - .executeEverySeconds(60) - .build(); - - final ValidationResult result = config.validate(null); - - assertThat(result.failed()).isTrue(); - assertThat(result.getErrors()).containsKey("aggregation_pipeline"); - } -} diff --git a/graylog2-server/src/test/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorTest.java b/graylog2-server/src/test/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorTest.java deleted file mode 100644 index 8ff4f11a9d..0000000000 --- a/graylog2-server/src/test/java/org/graylog/events/processor/mongodb/MongoDBEventProcessorTest.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * Copyright (C) 2020 Graylog, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - */ -package org.graylog.events.processor.mongodb; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.mongodb.MongoException; -import com.mongodb.client.AggregateIterable; -import org.bson.Document; -import org.bson.types.ObjectId; -import org.graylog.events.event.Event; -import org.graylog.events.event.EventFactory; -import org.graylog.events.event.EventReplayInfo; -import org.graylog.events.event.EventWithContext; -import org.graylog.events.fields.FieldValue; -import org.graylog.events.processor.DBEventProcessorStateService; -import org.graylog.events.processor.EventConsumer; -import org.graylog.events.processor.EventDefinition; -import org.graylog.events.processor.EventProcessorException; -import org.graylog.events.processor.EventProcessorParameters; -import org.graylog2.database.MongoCollections; -import org.graylog2.plugin.Message; -import org.graylog2.plugin.MessageFactory; -import org.graylog2.plugin.indexer.searches.timeranges.AbsoluteRange; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -class MongoDBEventProcessorTest { - - private static final String COLLECTION_NAME = "test_collection"; - private static final String VALID_PIPELINE = """ - [{"$group": {"_id": null, "count": {"$sum": 1}}}]"""; - - @Mock - private MongoCollections mongoCollections; - @Mock - private DBEventProcessorStateService stateService; - @Mock - private MessageFactory messageFactory; - @Mock - private EventDefinition eventDefinition; - @Mock - private EventFactory eventFactory; - - private final ObjectMapper objectMapper = new ObjectMapper(); - - /** - * Minimal concrete subclass of MongoDBEventProcessor for testing the base class behavior. - * Does not override createEvents — all events pass through unfiltered. - */ - static class TestableMongoDBEventProcessor extends MongoDBEventProcessor { - TestableMongoDBEventProcessor(EventDefinition eventDefinition, - MongoDBEventProcessorConfig config, - String collectionName, - MongoCollections mongoCollections, - DBEventProcessorStateService stateService, - MessageFactory messageFactory, - ObjectMapper objectMapper) { - super(eventDefinition, config, collectionName, mongoCollections, stateService, messageFactory, objectMapper); - } - } - - @BeforeEach - void setUp() { - lenient().when(eventDefinition.id()).thenReturn("test-event-def-id"); - lenient().when(eventDefinition.title()).thenReturn("Test Event"); - } - - private TestableMongoDBEventProcessor createProcessor(MongoDBEventProcessorConfig config) { - return new TestableMongoDBEventProcessor(eventDefinition, config, COLLECTION_NAME, - mongoCollections, stateService, messageFactory, objectMapper); - } - - private MongoDBEventProcessorConfig createConfig() { - return createConfig(VALID_PIPELINE); - } - - private MongoDBEventProcessorConfig createConfig(String pipeline) { - return MongoDBEventProcessorConfig.builder() - .aggregationPipeline(pipeline) - .timestampField("bucket") - .searchWithinSeconds(60) - .executeEverySeconds(60) - .build(); - } - - private MongoDBEventProcessorParameters createParams(DateTime from, DateTime to) { - return MongoDBEventProcessorParameters.builder() - .timerange(AbsoluteRange.create(from, to)) - .build(); - } - - /** - * Creates a mock Event that tracks setField/getField calls via a backing HashMap, - * so that field values set by the processor can be inspected in assertions. - */ - private Event createFieldTrackingEvent() { - final Map fields = new HashMap<>(); - final Event event = mock(Event.class); - doAnswer(inv -> { - fields.put(inv.getArgument(0), inv.getArgument(1)); - return null; - }).when(event).setField(anyString(), any(FieldValue.class)); - lenient().when(event.getField(anyString())).thenAnswer(inv -> fields.get(inv.getArgument(0))); - lenient().when(event.getMessage()).thenReturn("Test Event"); - return event; - } - - @SuppressWarnings("unchecked") - private void setupAggregation(Document result) { - final com.mongodb.client.MongoCollection rawCollection = mock(com.mongodb.client.MongoCollection.class); - final AggregateIterable aggregateIterable = mock(AggregateIterable.class); - - when(mongoCollections.nonEntityCollection(eq(COLLECTION_NAME), eq(Document.class))).thenReturn(rawCollection); - when(rawCollection.aggregate(any(List.class), eq(Document.class))).thenReturn(aggregateIterable); - when(aggregateIterable.batchSize(anyInt())).thenReturn(aggregateIterable); - when(aggregateIterable.first()).thenReturn(result); - } - - @SuppressWarnings("unchecked") - private void setupAggregationThrows(RuntimeException exception) { - final com.mongodb.client.MongoCollection rawCollection = mock(com.mongodb.client.MongoCollection.class); - final AggregateIterable aggregateIterable = mock(AggregateIterable.class); - - when(mongoCollections.nonEntityCollection(eq(COLLECTION_NAME), eq(Document.class))).thenReturn(rawCollection); - when(rawCollection.aggregate(any(List.class), eq(Document.class))).thenReturn(aggregateIterable); - when(aggregateIterable.batchSize(anyInt())).thenReturn(aggregateIterable); - when(aggregateIterable.first()).thenThrow(exception); - } - - // ==================== Invalid Parameters ==================== - - @Test - void createEvents_invalidParameterType_throwsException() { - final var processor = createProcessor(createConfig()); - final EventProcessorParameters invalidParams = mock(EventProcessorParameters.class); - - @SuppressWarnings("unchecked") - final EventConsumer> consumer = mock(EventConsumer.class); - - assertThatThrownBy(() -> processor.createEvents(eventFactory, invalidParams, consumer)) - .isInstanceOf(EventProcessorException.class) - .hasMessageContaining("Invalid parameters type") - .hasMessageContaining("MongoDBEventProcessorParameters"); - } - - // ==================== Event Field Population ==================== - - @SuppressWarnings("unchecked") - @Test - void createEvents_setsEventFieldsFromAggregation() throws Exception { - final Document result = new Document() - .append("_id", null) - .append("count", 42) - .append("total_bytes", 1024L); - - setupAggregation(result); - - final Event event = createFieldTrackingEvent(); - when(eventFactory.createEvent(any(), any(), any())).thenReturn(event); - when(messageFactory.createMessage(anyString(), anyString(), any(DateTime.class))).thenReturn(mock(Message.class)); - - final var processor = createProcessor(createConfig()); - final EventConsumer> consumer = mock(EventConsumer.class); - final DateTime from = DateTime.now(DateTimeZone.UTC).minusHours(1); - final DateTime to = DateTime.now(DateTimeZone.UTC); - - processor.createEvents(eventFactory, createParams(from, to), consumer); - - // Aggregation fields should be set on the event (as string values) - assertThat(event.getField("count")).isNotNull(); - assertThat(event.getField("count").value()).isEqualTo("42"); - assertThat(event.getField("total_bytes")).isNotNull(); - assertThat(event.getField("total_bytes").value()).isEqualTo("1024"); - - // _id should be skipped - assertThat(event.getField("_id")).isNull(); - } - - // ==================== Event Metadata ==================== - - @SuppressWarnings("unchecked") - @Test - void createEvents_setsEventMetadata() throws Exception { - final Document result = new Document("count", 1); - setupAggregation(result); - - final Event event = createFieldTrackingEvent(); - when(eventFactory.createEvent(any(), any(), any())).thenReturn(event); - when(messageFactory.createMessage(anyString(), anyString(), any(DateTime.class))).thenReturn(mock(Message.class)); - - final var processor = createProcessor(createConfig()); - final EventConsumer> consumer = mock(EventConsumer.class); - final DateTime from = new DateTime(2026, 3, 5, 0, 0, DateTimeZone.UTC); - final DateTime to = new DateTime(2026, 3, 5, 12, 0, DateTimeZone.UTC); - - processor.createEvents(eventFactory, createParams(from, to), consumer); - - // Event should be created with the end-of-timerange timestamp and event definition title - verify(eventFactory).createEvent(eq(eventDefinition), eq(to), eq("Test Event")); - - // Timerange should be set - verify(event).setTimerangeStart(from); - verify(event).setTimerangeEnd(to); - - // Origin context should reference the collection and time range - final ArgumentCaptor originCaptor = ArgumentCaptor.forClass(String.class); - verify(event).setOriginContext(originCaptor.capture()); - assertThat(originCaptor.getValue()) - .contains(COLLECTION_NAME) - .contains(String.valueOf(from.getMillis())) - .contains(String.valueOf(to.getMillis())); - - // Replay info should be set with timerange and pipeline query - final ArgumentCaptor replayCaptor = ArgumentCaptor.forClass(EventReplayInfo.class); - verify(event).setReplayInfo(replayCaptor.capture()); - final EventReplayInfo replayInfo = replayCaptor.getValue(); - assertThat(replayInfo.timerangeStart()).isEqualTo(from); - assertThat(replayInfo.timerangeEnd()).isEqualTo(to); - assertThat(replayInfo.query()).isEqualTo(VALID_PIPELINE); - } - - // ==================== Message Creation ==================== - - @SuppressWarnings("unchecked") - @Test - void createEvents_createsMessageWithAggregationResult() throws Exception { - final Document result = new Document() - .append("_id", null) - .append("count", 42); - setupAggregation(result); - - final Event event = createFieldTrackingEvent(); - when(eventFactory.createEvent(any(), any(), any())).thenReturn(event); - - final Message message = mock(Message.class); - when(messageFactory.createMessage(anyString(), anyString(), any(DateTime.class))).thenReturn(message); - - final var processor = createProcessor(createConfig()); - final EventConsumer> consumer = mock(EventConsumer.class); - final DateTime from = new DateTime(2026, 3, 5, 0, 0, DateTimeZone.UTC); - final DateTime to = new DateTime(2026, 3, 5, 12, 0, DateTimeZone.UTC); - - processor.createEvents(eventFactory, createParams(from, to), consumer); - - // Message should be created with collection name in text, at the end-of-timerange timestamp - final ArgumentCaptor msgTextCaptor = ArgumentCaptor.forClass(String.class); - verify(messageFactory).createMessage(msgTextCaptor.capture(), eq("mongodb-event-processor"), eq(to)); - assertThat(msgTextCaptor.getValue()).contains(COLLECTION_NAME); - - // Message should have aggregation fields added - @SuppressWarnings("unchecked") - final ArgumentCaptor> fieldsCaptor = ArgumentCaptor.forClass(Map.class); - verify(message).addFields(fieldsCaptor.capture()); - final Map messageFields = fieldsCaptor.getValue(); - assertThat(messageFields).containsKey("aggregation_result"); - assertThat(messageFields).containsKey("timerange_start"); - assertThat(messageFields).containsKey("timerange_end"); - assertThat(messageFields).containsEntry("count", 42); - // _id should be skipped in message fields too - assertThat(messageFields).doesNotContainKey("_id"); - } - - // ==================== State Service ==================== - - @SuppressWarnings("unchecked") - @Test - void createEvents_updatesStateOnResult() throws Exception { - final Document result = new Document("count", 1); - setupAggregation(result); - - final Event event = createFieldTrackingEvent(); - when(eventFactory.createEvent(any(), any(), any())).thenReturn(event); - when(messageFactory.createMessage(anyString(), anyString(), any(DateTime.class))).thenReturn(mock(Message.class)); - - final var processor = createProcessor(createConfig()); - final EventConsumer> consumer = mock(EventConsumer.class); - final DateTime from = new DateTime(2026, 3, 5, 0, 0, DateTimeZone.UTC); - final DateTime to = new DateTime(2026, 3, 5, 12, 0, DateTimeZone.UTC); - - processor.createEvents(eventFactory, createParams(from, to), consumer); - - verify(stateService).setState("test-event-def-id", from, to); - } - - @SuppressWarnings("unchecked") - @Test - void createEvents_updatesStateOnNoResult() throws Exception { - setupAggregation(null); - - final var processor = createProcessor(createConfig()); - final EventConsumer> consumer = mock(EventConsumer.class); - final DateTime from = new DateTime(2026, 3, 5, 0, 0, DateTimeZone.UTC); - final DateTime to = new DateTime(2026, 3, 5, 12, 0, DateTimeZone.UTC); - - processor.createEvents(eventFactory, createParams(from, to), consumer); - - // State should still be updated even with no results - verify(stateService).setState("test-event-def-id", from, to); - // Consumer should NOT be called - verify(consumer, never()).accept(any()); - } - - // ==================== Exception Handling ==================== - - @SuppressWarnings("unchecked") - @Test - void createEvents_wrapsAggregationException() { - setupAggregationThrows(new MongoException("Connection lost")); - - final var processor = createProcessor(createConfig()); - final EventConsumer> consumer = mock(EventConsumer.class); - final DateTime from = DateTime.now(DateTimeZone.UTC).minusHours(1); - final DateTime to = DateTime.now(DateTimeZone.UTC); - - assertThatThrownBy(() -> processor.createEvents(eventFactory, createParams(from, to), consumer)) - .isInstanceOf(EventProcessorException.class) - .hasMessageContaining("Failed to execute MongoDB aggregation") - .hasMessageContaining("Test Event") - .hasCauseInstanceOf(MongoException.class); - } - - // ==================== Value Conversion ==================== - - @SuppressWarnings("unchecked") - @Test - void createEvents_convertsMongoValueTypes() throws Exception { - final Date javaDate = new Date(1709596800000L); // 2024-03-05T00:00:00Z - final ObjectId objectId = new ObjectId("507f1f77bcf86cd799439011"); - final Document result = new Document() - .append("date_field", javaDate) - .append("objectid_field", objectId) - .append("number_field", 12345L) - .append("string_field", "hello") - .append("null_field", null); - setupAggregation(result); - - final Event event = createFieldTrackingEvent(); - when(eventFactory.createEvent(any(), any(), any())).thenReturn(event); - when(messageFactory.createMessage(anyString(), anyString(), any(DateTime.class))).thenReturn(mock(Message.class)); - - final var processor = createProcessor(createConfig()); - final EventConsumer> consumer = mock(EventConsumer.class); - final DateTime from = DateTime.now(DateTimeZone.UTC).minusHours(1); - final DateTime to = DateTime.now(DateTimeZone.UTC); - - processor.createEvents(eventFactory, createParams(from, to), consumer); - - // Date should be converted to a DateTime string representation - assertThat(event.getField("date_field")).isNotNull(); - assertThat(event.getField("date_field").value()).contains("2024-03-05"); - - // ObjectId should be converted to its string representation - assertThat(event.getField("objectid_field")).isNotNull(); - assertThat(event.getField("objectid_field").value()).isEqualTo("507f1f77bcf86cd799439011"); - - // Numbers should be preserved (as string via FieldValue.string()) - assertThat(event.getField("number_field")).isNotNull(); - assertThat(event.getField("number_field").value()).isEqualTo("12345"); - - // Strings should be preserved - assertThat(event.getField("string_field")).isNotNull(); - assertThat(event.getField("string_field").value()).isEqualTo("hello"); - - // Null values should result in "null" string (String.valueOf(null)) - assertThat(event.getField("null_field")).isNotNull(); - assertThat(event.getField("null_field").value()).isEqualTo("null"); - } -}