mirror of
https://github.com/Graylog2/graylog2-server.git
synced 2026-03-13 09:32:21 +08:00
Refactor event summary template logic to EventModifier (#24757)
* Refactor event summary template logic to EventModifier * cl * add event definition fields to model data --------- Co-authored-by: Ryan Carroll <ryan.carroll@graylog.com>
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
type = "a"
|
||||
message = "Added support for customizing titles of events with new configuration option in Event Definitions."
|
||||
|
||||
pulls = ["24578", "graylog-plugin-enterprise#12830"]
|
||||
pulls = ["24578", "24757", "graylog-plugin-enterprise#12830"]
|
||||
|
||||
|
||||
|
||||
@@ -57,6 +57,7 @@ import org.graylog.events.processor.aggregation.AggregationEventProcessorConfig;
|
||||
import org.graylog.events.processor.aggregation.AggregationEventProcessorParameters;
|
||||
import org.graylog.events.processor.aggregation.AggregationSearch;
|
||||
import org.graylog.events.processor.aggregation.PivotAggregationSearch;
|
||||
import org.graylog.events.processor.modifier.EventSummaryModifier;
|
||||
import org.graylog.events.processor.storage.EventStorageHandlerEngine;
|
||||
import org.graylog.events.processor.storage.PersistToStreamsStorageHandler;
|
||||
import org.graylog.events.processor.systemnotification.SystemNotificationEventProcessor;
|
||||
@@ -186,10 +187,11 @@ public class EventsModule extends PluginModule {
|
||||
|
||||
serviceBinder().addBinding().to(NotificationSystemEventPublisher.class).in(Scopes.SINGLETON);
|
||||
|
||||
eventModifierBinder(); // Initialize event modifier binding to avoid errors when no modifiers are bound.
|
||||
eventQuerySearchTypeSupplierBinder(); // Initialize binder to avoid errors when no suppliers are bound.
|
||||
|
||||
OptionalBinder.newOptionalBinder(binder(), EventProcedureProvider.class)
|
||||
.setDefault().to(DefaultEventProcedureProvider.class);
|
||||
|
||||
addEventModifier(EventSummaryModifier.class);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,12 +16,9 @@
|
||||
*/
|
||||
package org.graylog.events.processor.aggregation;
|
||||
|
||||
import com.floreysoft.jmte.Engine;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -59,16 +56,11 @@ import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.graylog.events.event.EventDto.FIELD_EVENT_DEFINITION_ID;
|
||||
import static org.graylog.events.notifications.EventNotificationModelData.FIELD_EVENT_DEFINITION_DESCRIPTION;
|
||||
import static org.graylog.events.notifications.EventNotificationModelData.FIELD_EVENT_DEFINITION_TITLE;
|
||||
import static org.graylog.events.notifications.EventNotificationModelData.FIELD_EVENT_DEFINITION_TYPE;
|
||||
import static org.graylog.events.search.MoreSearch.luceneEscape;
|
||||
|
||||
public class AggregationEventProcessor implements EventProcessor {
|
||||
@@ -88,7 +80,6 @@ public class AggregationEventProcessor implements EventProcessor {
|
||||
private final Messages messages;
|
||||
private final PermittedStreams permittedStreams;
|
||||
private final AggregationSearchUtils aggregationSearchUtils;
|
||||
private final Engine templateEngine;
|
||||
|
||||
@Inject
|
||||
public AggregationEventProcessor(@Assisted EventDefinition eventDefinition,
|
||||
@@ -100,8 +91,7 @@ public class AggregationEventProcessor implements EventProcessor {
|
||||
Messages messages,
|
||||
PermittedStreams permittedStreams,
|
||||
Set<EventQuerySearchTypeSupplier> eventQueryModifiers,
|
||||
MessageFactory messageFactory,
|
||||
Engine templateEngine) {
|
||||
MessageFactory messageFactory) {
|
||||
this.eventDefinition = eventDefinition;
|
||||
this.config = (AggregationEventProcessorConfig) eventDefinition.config();
|
||||
this.dependencyCheck = dependencyCheck;
|
||||
@@ -110,7 +100,6 @@ public class AggregationEventProcessor implements EventProcessor {
|
||||
this.eventStreamService = eventStreamService;
|
||||
this.messages = messages;
|
||||
this.permittedStreams = permittedStreams;
|
||||
this.templateEngine = templateEngine;
|
||||
// If this is a simple Filter search there is no need to initialize aggregationSearchUtils
|
||||
this.aggregationSearchUtils = config.series().isEmpty() ? null : new AggregationSearchUtils(
|
||||
eventDefinition,
|
||||
@@ -119,8 +108,7 @@ public class AggregationEventProcessor implements EventProcessor {
|
||||
aggregationSearchFactory,
|
||||
eventStreamService,
|
||||
messageFactory,
|
||||
permittedStreams,
|
||||
templateEngine
|
||||
permittedStreams
|
||||
);
|
||||
}
|
||||
|
||||
@@ -254,15 +242,6 @@ public class AggregationEventProcessor implements EventProcessor {
|
||||
for (final ResultMessage resultMessage : messages) {
|
||||
final Message msg = resultMessage.getMessage();
|
||||
final Event event = eventFactory.createEvent(eventDefinition, msg.getTimestamp(), eventDefinition.title());
|
||||
final String customEventSummary = eventDefinition.eventSummaryTemplate();
|
||||
if (!Strings.isNullOrEmpty(customEventSummary)) {
|
||||
final Map<String, Object> templateFields = Maps.newHashMap(msg.getFields());
|
||||
templateFields.put(FIELD_EVENT_DEFINITION_ID, eventDefinition.id());
|
||||
templateFields.put(FIELD_EVENT_DEFINITION_TITLE, eventDefinition.title());
|
||||
templateFields.put(FIELD_EVENT_DEFINITION_TYPE, eventDefinition.config().type());
|
||||
templateFields.put(FIELD_EVENT_DEFINITION_DESCRIPTION, eventDefinition.description());
|
||||
event.setMessage(templateEngine.transform(customEventSummary, templateFields));
|
||||
}
|
||||
event.setOriginContext(EventOriginContext.elasticsearchMessage(resultMessage.getIndex(), msg.getId()));
|
||||
|
||||
// Ensure the event has values in the "source_streams" field for permission checks to work
|
||||
|
||||
@@ -16,9 +16,7 @@
|
||||
*/
|
||||
package org.graylog.events.processor.aggregation;
|
||||
|
||||
import com.floreysoft.jmte.Engine;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
@@ -53,12 +51,6 @@ import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.graylog.events.event.EventDto.FIELD_AGGREGATION_CONDITIONS;
|
||||
import static org.graylog.events.event.EventDto.FIELD_EVENT_DEFINITION_ID;
|
||||
import static org.graylog.events.notifications.EventNotificationModelData.FIELD_EVENT_DEFINITION_DESCRIPTION;
|
||||
import static org.graylog.events.notifications.EventNotificationModelData.FIELD_EVENT_DEFINITION_TITLE;
|
||||
import static org.graylog.events.notifications.EventNotificationModelData.FIELD_EVENT_DEFINITION_TYPE;
|
||||
|
||||
public class AggregationSearchUtils {
|
||||
private final Logger LOG = LoggerFactory.getLogger(AggregationSearchUtils.class);
|
||||
|
||||
@@ -69,7 +61,6 @@ public class AggregationSearchUtils {
|
||||
private final EventStreamService eventStreamService;
|
||||
private final MessageFactory messageFactory;
|
||||
private final PermittedStreams permittedStreams;
|
||||
private final Engine templateEngine;
|
||||
|
||||
public AggregationSearchUtils(EventDefinition eventDefinition,
|
||||
AggregationEventProcessorConfig config,
|
||||
@@ -77,8 +68,7 @@ public class AggregationSearchUtils {
|
||||
AggregationSearch.Factory aggregationSearchFactory,
|
||||
EventStreamService eventStreamService,
|
||||
MessageFactory messageFactory,
|
||||
PermittedStreams permittedStreams,
|
||||
Engine templateEngine) {
|
||||
PermittedStreams permittedStreams) {
|
||||
this.eventDefinition = eventDefinition;
|
||||
this.config = config;
|
||||
this.eventQueryModifiers = eventQueryModifiers;
|
||||
@@ -86,7 +76,6 @@ public class AggregationSearchUtils {
|
||||
this.eventStreamService = eventStreamService;
|
||||
this.messageFactory = messageFactory;
|
||||
this.permittedStreams = permittedStreams;
|
||||
this.templateEngine = templateEngine;
|
||||
}
|
||||
|
||||
public void aggregatedSearch(EventFactory eventFactory, AggregationEventProcessorParameters parameters,
|
||||
@@ -208,18 +197,6 @@ public class AggregationSearchUtils {
|
||||
|
||||
LOG.debug("Creating event {}/{} - {} {} ({})", eventDefinition.title(), eventDefinition.id(), keyResult.key(), seriesString(keyResult), fields);
|
||||
|
||||
// If the event definition has a custom event summary, transform and apply it to the event.
|
||||
final String customEventSummary = eventDefinition.eventSummaryTemplate();
|
||||
if (!Strings.isNullOrEmpty(customEventSummary)) {
|
||||
final Map<String, Object> templateFields = Maps.newHashMap(fields);
|
||||
templateFields.put(FIELD_AGGREGATION_CONDITIONS, event.getAggregationConditions());
|
||||
templateFields.put(FIELD_EVENT_DEFINITION_ID, eventDefinition.id());
|
||||
templateFields.put(FIELD_EVENT_DEFINITION_TITLE, eventDefinition.title());
|
||||
templateFields.put(FIELD_EVENT_DEFINITION_TYPE, eventDefinition.config().type());
|
||||
templateFields.put(FIELD_EVENT_DEFINITION_DESCRIPTION, eventDefinition.description());
|
||||
event.setMessage(templateEngine.transform(customEventSummary, templateFields));
|
||||
}
|
||||
|
||||
// TODO: Can we find a useful source value?
|
||||
final Message message = messageFactory.createMessage(event.getMessage(), "", result.effectiveTimerange().to());
|
||||
message.addFields(fields);
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright (C) 2020 Graylog, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*/
|
||||
package org.graylog.events.processor.modifier;
|
||||
|
||||
import com.floreysoft.jmte.Engine;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import jakarta.inject.Inject;
|
||||
import org.graylog.events.event.Event;
|
||||
import org.graylog.events.event.EventWithContext;
|
||||
import org.graylog.events.processor.EventDefinition;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.graylog.events.event.EventDto.FIELD_EVENT_DEFINITION_ID;
|
||||
import static org.graylog.events.notifications.EventNotificationModelData.FIELD_EVENT_DEFINITION_DESCRIPTION;
|
||||
import static org.graylog.events.notifications.EventNotificationModelData.FIELD_EVENT_DEFINITION_TITLE;
|
||||
import static org.graylog.events.notifications.EventNotificationModelData.FIELD_EVENT_DEFINITION_TYPE;
|
||||
|
||||
public class EventSummaryModifier implements EventModifier {
|
||||
private final Engine templateEngine;
|
||||
|
||||
@Inject
|
||||
public EventSummaryModifier(Engine templateEngine) {
|
||||
this.templateEngine = templateEngine;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(EventWithContext eventWithContext, EventDefinition eventDefinition) throws EventModifierException {
|
||||
if (!Strings.isNullOrEmpty(eventDefinition.eventSummaryTemplate())) {
|
||||
final ImmutableMap.Builder<String, Object> dataModelBuilder = ImmutableMap.builder();
|
||||
|
||||
dataModelBuilder.put(FIELD_EVENT_DEFINITION_ID, Objects.requireNonNull(eventDefinition.id()));
|
||||
dataModelBuilder.put(FIELD_EVENT_DEFINITION_TITLE, eventDefinition.title());
|
||||
dataModelBuilder.put(FIELD_EVENT_DEFINITION_TYPE, eventDefinition.config().type());
|
||||
dataModelBuilder.put(FIELD_EVENT_DEFINITION_DESCRIPTION, eventDefinition.description());
|
||||
|
||||
if (eventWithContext.messageContext().isPresent()) {
|
||||
dataModelBuilder.put("source", eventWithContext.messageContext().get().getFields());
|
||||
} else if (eventWithContext.eventContext().isPresent()) {
|
||||
dataModelBuilder.put("source", eventWithContext.eventContext().get().toDto().fields());
|
||||
}
|
||||
|
||||
final ImmutableMap<String, Object> dataModel = dataModelBuilder.build();
|
||||
|
||||
final Event event = eventWithContext.event();
|
||||
event.setMessage(templateEngine.transform(eventDefinition.eventSummaryTemplate(), dataModel));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -20,13 +20,10 @@ import com.floreysoft.jmte.Engine;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.graylog.events.event.Event;
|
||||
import org.graylog.events.event.EventFactory;
|
||||
import org.graylog.events.event.EventWithContext;
|
||||
import org.graylog.events.event.TestEvent;
|
||||
import org.graylog.events.notifications.EventNotificationSettings;
|
||||
import org.graylog.events.processor.DBEventProcessorStateService;
|
||||
import org.graylog.events.processor.EventDefinition;
|
||||
import org.graylog.events.processor.EventDefinitionDto;
|
||||
import org.graylog.events.processor.EventProcessorDependencyCheck;
|
||||
import org.graylog.events.processor.EventProcessorException;
|
||||
@@ -39,10 +36,7 @@ import org.graylog.plugins.views.search.searchfilters.model.UsedSearchFilter;
|
||||
import org.graylog.plugins.views.search.searchtypes.pivot.SeriesSpec;
|
||||
import org.graylog.plugins.views.search.searchtypes.pivot.series.Count;
|
||||
import org.graylog2.indexer.messages.Messages;
|
||||
import org.graylog2.indexer.results.ResultMessage;
|
||||
import org.graylog2.indexer.results.TestResultMessageFactory;
|
||||
import org.graylog2.notifications.NotificationService;
|
||||
import org.graylog2.plugin.Message;
|
||||
import org.graylog2.plugin.MessageFactory;
|
||||
import org.graylog2.plugin.MessageSummary;
|
||||
import org.graylog2.plugin.TestMessageFactory;
|
||||
@@ -65,17 +59,13 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatCode;
|
||||
import static org.graylog2.plugin.streams.Stream.NON_MESSAGE_STREAM_IDS;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.times;
|
||||
@@ -150,7 +140,7 @@ public class AggregationEventProcessorTest {
|
||||
.build();
|
||||
|
||||
final AggregationEventProcessor eventProcessor = new AggregationEventProcessor(eventDefinitionDto, searchFactory,
|
||||
eventProcessorDependencyCheck, stateService, moreSearch, eventStreamService, messages, permittedStreams, Set.of(), messageFactory, templateEngine);
|
||||
eventProcessorDependencyCheck, stateService, moreSearch, eventStreamService, messages, permittedStreams, Set.of(), messageFactory);
|
||||
|
||||
assertThatCode(() -> eventProcessor.createEvents(eventFactory, parameters, (events) -> {})).doesNotThrowAnyException();
|
||||
|
||||
@@ -186,7 +176,7 @@ public class AggregationEventProcessorTest {
|
||||
.build();
|
||||
|
||||
final AggregationEventProcessor eventProcessor = new AggregationEventProcessor(eventDefinitionDto, searchFactory,
|
||||
eventProcessorDependencyCheck, stateService, moreSearch, eventStreamService, messages, permittedStreams, Set.of(), messageFactory, templateEngine);
|
||||
eventProcessorDependencyCheck, stateService, moreSearch, eventStreamService, messages, permittedStreams, Set.of(), messageFactory);
|
||||
|
||||
// If the dependency check returns true, there should be no exception raised and the state service should be called
|
||||
when(eventProcessorDependencyCheck.hasMessagesIndexedUpTo(timerange)).thenReturn(true);
|
||||
@@ -301,68 +291,11 @@ public class AggregationEventProcessorTest {
|
||||
final EventDefinitionDto eventDefinitionDto = buildEventDefinitionDto(ImmutableSet.of(), ImmutableList.of(series), null, filters);
|
||||
final AggregationEventProcessor eventProcessor = new AggregationEventProcessor(
|
||||
eventDefinitionDto, searchFactory, eventProcessorDependencyCheck, stateService, moreSearch,
|
||||
eventStreamService, messages, permittedStreams, Set.of(), messageFactory, templateEngine);
|
||||
eventStreamService, messages, permittedStreams, Set.of(), messageFactory);
|
||||
|
||||
eventProcessor.sourceMessagesForEvent(event, messageConsumer, batchLimit);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createEventsAppliesCustomEventSummaryTemplate() throws Exception {
|
||||
when(eventProcessorDependencyCheck.hasMessagesIndexedUpTo(any(TimeRange.class))).thenReturn(true);
|
||||
when(streamService.getSystemStreamIds(false)).thenReturn(NON_MESSAGE_STREAM_IDS);
|
||||
|
||||
final DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
final AbsoluteRange timerange = AbsoluteRange.create(now.minusHours(1), now.minusHours(1).plusMillis(SEARCH_WINDOW_MS));
|
||||
|
||||
final AggregationEventProcessorConfig config = AggregationEventProcessorConfig.builder()
|
||||
.query(QUERY_STRING)
|
||||
.streams(ImmutableSet.of())
|
||||
.groupBy(ImmutableList.of())
|
||||
.series(ImmutableList.of())
|
||||
.conditions(null)
|
||||
.searchWithinMs(SEARCH_WINDOW_MS)
|
||||
.executeEveryMs(SEARCH_WINDOW_MS)
|
||||
.build();
|
||||
final EventDefinitionDto eventDefinitionDto = buildEventDefinitionDto(ImmutableSet.of(), ImmutableList.of(), null, emptyList())
|
||||
.toBuilder()
|
||||
.config(config)
|
||||
.eventSummaryTemplate("${user_name} failed to log in on ${host_name}")
|
||||
.build();
|
||||
final AggregationEventProcessorParameters parameters = AggregationEventProcessorParameters.builder()
|
||||
.timerange(timerange)
|
||||
.build();
|
||||
|
||||
final AggregationEventProcessor eventProcessor = new AggregationEventProcessor(eventDefinitionDto, searchFactory,
|
||||
eventProcessorDependencyCheck, stateService, moreSearch, eventStreamService, messages, permittedStreams, Set.of(), messageFactory, templateEngine);
|
||||
|
||||
final TestEvent createdEvent = new TestEvent(now);
|
||||
when(eventFactory.createEvent(any(EventDefinition.class), any(DateTime.class), anyString()))
|
||||
.thenReturn(createdEvent);
|
||||
|
||||
final Message message = messageFactory.createMessage("message", "src", now);
|
||||
message.addField("user_name", "test_user");
|
||||
message.addField("host_name", "host0001");
|
||||
message.addField("message", "message");
|
||||
final ResultMessage resultMessage = new TestResultMessageFactory().createFromMessage(message);
|
||||
resultMessage.setIndex("index-0");
|
||||
|
||||
doAnswer(invocation -> {
|
||||
final MoreSearch.ScrollCallback callback = invocation.getArgument(6);
|
||||
callback.call(List.of(resultMessage), new AtomicBoolean(true));
|
||||
return null;
|
||||
}).when(moreSearch).scrollQuery(eq(config.query()), any(), eq(config.filters()), eq(config.queryParameters()),
|
||||
eq(parameters.timerange()), eq(parameters.batchSize()), any(MoreSearch.ScrollCallback.class));
|
||||
|
||||
final List<List<EventWithContext>> capturedEvents = new ArrayList<>();
|
||||
eventProcessor.createEvents(eventFactory, parameters, capturedEvents::add);
|
||||
|
||||
assertThat(capturedEvents).hasSize(1);
|
||||
assertThat(capturedEvents.getFirst()).hasSize(1);
|
||||
final Event event = capturedEvents.getFirst().getFirst().event();
|
||||
assertThat(event.getMessage()).isEqualTo("test_user failed to log in on host0001");
|
||||
assertThat(createdEvent.getMessage()).isEqualTo("test_user failed to log in on host0001");
|
||||
}
|
||||
|
||||
// Helper method to build test EventDefinitionDto, since we only care about a few of the values
|
||||
private EventDefinitionDto buildEventDefinitionDto(
|
||||
Set<String> testStreams, List<SeriesSpec> testSeries, AggregationConditions testConditions, List<UsedSearchFilter> filters) {
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
*/
|
||||
package org.graylog.events.processor.aggregation;
|
||||
|
||||
import com.floreysoft.jmte.Engine;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
@@ -89,7 +88,6 @@ public class AggregationSearchUtilsTest {
|
||||
private PermittedStreams permittedStreams;
|
||||
private EventStreamService eventStreamService;
|
||||
private final MessageFactory messageFactory = new TestMessageFactory();
|
||||
private final Engine templateEngine = new Engine();
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
@@ -132,8 +130,7 @@ public class AggregationSearchUtilsTest {
|
||||
searchFactory,
|
||||
eventStreamService,
|
||||
messageFactory,
|
||||
permittedStreams,
|
||||
templateEngine
|
||||
permittedStreams
|
||||
);
|
||||
|
||||
final AggregationResult result = AggregationResult.builder()
|
||||
@@ -204,64 +201,6 @@ public class AggregationSearchUtilsTest {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventsFromAggregationResultUsesCustomEventSummaryTemplate() throws EventProcessorException {
|
||||
final DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
final AbsoluteRange timerange = AbsoluteRange.create(now.minusHours(1), now.minusHours(1).plusMillis(SEARCH_WINDOW_MS));
|
||||
|
||||
final TestEvent event1 = new TestEvent(timerange.to());
|
||||
when(eventFactory.createEvent(any(EventDefinition.class), any(DateTime.class), anyString()))
|
||||
.thenReturn(event1);
|
||||
|
||||
final EventDefinitionDto eventDefinitionDto = buildEventDefinitionDto(ImmutableSet.of("stream-2"), ImmutableList.of(), null, emptyList())
|
||||
.toBuilder()
|
||||
.eventSummaryTemplate("Aggregation on ${group_field_one} and ${group_field_two} has source count of ${aggregation_conditions.count_source}")
|
||||
.build();
|
||||
final AggregationEventProcessorParameters parameters = AggregationEventProcessorParameters.builder()
|
||||
.timerange(timerange)
|
||||
.build();
|
||||
|
||||
final AggregationSearchUtils searchUtils = new AggregationSearchUtils(
|
||||
eventDefinitionDto,
|
||||
(AggregationEventProcessorConfig) eventDefinitionDto.config(),
|
||||
Set.of(),
|
||||
searchFactory,
|
||||
eventStreamService,
|
||||
messageFactory,
|
||||
permittedStreams,
|
||||
templateEngine
|
||||
);
|
||||
|
||||
final AggregationResult result = AggregationResult.builder()
|
||||
.effectiveTimerange(timerange)
|
||||
.totalAggregatedMessages(1)
|
||||
.sourceStreams(ImmutableSet.of("stream-1", "stream-2"))
|
||||
.keyResults(ImmutableList.of(
|
||||
AggregationKeyResult.builder()
|
||||
.key(ImmutableList.of("one", "two"))
|
||||
.timestamp(timerange.to())
|
||||
.seriesValues(ImmutableList.of(
|
||||
AggregationSeriesValue.builder()
|
||||
.key(ImmutableList.of("a"))
|
||||
.value(42.0d)
|
||||
.series(Count.builder()
|
||||
.id("abc123")
|
||||
.field("source")
|
||||
.build())
|
||||
.build()
|
||||
))
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
|
||||
final ImmutableList<EventWithContext> eventsWithContext = searchUtils.eventsFromAggregationResult(eventFactory, parameters, result, (event) -> {});
|
||||
|
||||
assertThat(eventsWithContext).hasSize(1);
|
||||
final EventWithContext eventWithContext = eventsWithContext.getFirst();
|
||||
assertThat(eventWithContext.event().getMessage()).isEqualTo("Aggregation on one and two has source count of 42.0");
|
||||
assertThat(eventWithContext.messageContext().orElseThrow().getMessage()).isEqualTo("Aggregation on one and two has source count of 42.0");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventsFromAggregationResultWithEventModifierState() throws EventProcessorException {
|
||||
final DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
@@ -302,8 +241,7 @@ public class AggregationSearchUtilsTest {
|
||||
searchFactory,
|
||||
eventStreamService,
|
||||
messageFactory,
|
||||
permittedStreams,
|
||||
templateEngine
|
||||
permittedStreams
|
||||
);
|
||||
|
||||
final AggregationResult result = AggregationResult.builder()
|
||||
@@ -389,8 +327,7 @@ public class AggregationSearchUtilsTest {
|
||||
searchFactory,
|
||||
eventStreamService,
|
||||
messageFactory,
|
||||
permittedStreams,
|
||||
templateEngine
|
||||
permittedStreams
|
||||
);
|
||||
|
||||
final AggregationResult result = AggregationResult.builder()
|
||||
@@ -506,8 +443,7 @@ public class AggregationSearchUtilsTest {
|
||||
searchFactory,
|
||||
eventStreamService,
|
||||
messageFactory,
|
||||
permittedStreams,
|
||||
templateEngine
|
||||
permittedStreams
|
||||
);
|
||||
|
||||
final AggregationResult result = AggregationResult.builder()
|
||||
@@ -599,8 +535,7 @@ public class AggregationSearchUtilsTest {
|
||||
searchFactory,
|
||||
eventStreamService,
|
||||
messageFactory,
|
||||
permittedStreams,
|
||||
templateEngine
|
||||
permittedStreams
|
||||
);
|
||||
final AggregationResult result = buildAggregationResult(timerange, timerange.to(), ImmutableList.of("one", "two"));
|
||||
final ImmutableList<EventWithContext> eventsWithContext = searchUtils.eventsFromAggregationResult(eventFactory, parameters, result, (event) -> {});
|
||||
@@ -663,8 +598,7 @@ public class AggregationSearchUtilsTest {
|
||||
searchFactory,
|
||||
eventStreamService,
|
||||
messageFactory,
|
||||
permittedStreams,
|
||||
templateEngine
|
||||
permittedStreams
|
||||
);
|
||||
final AggregationResult result = buildAggregationResult(timerange, timerange.to(), ImmutableList.of("one", "two"));
|
||||
final ImmutableList<EventWithContext> eventsWithContext = searchUtils.eventsFromAggregationResult(eventFactory, parameters, result, (event) -> {});
|
||||
@@ -718,8 +652,7 @@ public class AggregationSearchUtilsTest {
|
||||
searchFactory,
|
||||
eventStreamService,
|
||||
messageFactory,
|
||||
permittedStreams,
|
||||
templateEngine
|
||||
permittedStreams
|
||||
);
|
||||
|
||||
final AggregationResult result = AggregationResult.builder()
|
||||
|
||||
@@ -0,0 +1,114 @@
|
||||
/*
|
||||
* Copyright (C) 2020 Graylog, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*/
|
||||
package org.graylog.events.processor.modifier;
|
||||
|
||||
import com.floreysoft.jmte.Engine;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.graylog.events.event.EventWithContext;
|
||||
import org.graylog.events.event.TestEvent;
|
||||
import org.graylog.events.notifications.EventNotificationSettings;
|
||||
import org.graylog.events.processor.EventDefinitionDto;
|
||||
import org.graylog.events.processor.aggregation.AggregationConditions;
|
||||
import org.graylog.events.processor.aggregation.AggregationEventProcessorConfig;
|
||||
import org.graylog.plugins.views.search.searchfilters.model.UsedSearchFilter;
|
||||
import org.graylog.plugins.views.search.searchtypes.pivot.SeriesSpec;
|
||||
import org.graylog2.plugin.Message;
|
||||
import org.graylog2.plugin.MessageFactory;
|
||||
import org.graylog2.plugin.TestMessageFactory;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class EventSummaryModifierTest {
|
||||
private static final int SEARCH_WINDOW_MS = 30000;
|
||||
private static final String QUERY_STRING = "aQueryString";
|
||||
private final MessageFactory messageFactory = new TestMessageFactory();
|
||||
|
||||
@Test
|
||||
void appliesCustomEventSummaryTemplateFromMessageContext() throws Exception {
|
||||
final EventDefinitionDto eventDefinitionDto = buildEventDefinitionDto(ImmutableSet.of(), ImmutableList.of(), null, emptyList())
|
||||
.toBuilder()
|
||||
.eventSummaryTemplate("${source.user_name} failed to log in on ${source.host_name}")
|
||||
.build();
|
||||
|
||||
final EventSummaryModifier modifier = new EventSummaryModifier(new Engine());
|
||||
|
||||
final DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
final TestEvent event = new TestEvent(now);
|
||||
final Message message = messageFactory.createMessage("message", "src", now);
|
||||
message.addField("user_name", "test_user");
|
||||
message.addField("host_name", "host0001");
|
||||
message.addField("message", "message");
|
||||
|
||||
final EventWithContext eventWithContext = EventWithContext.create(event, message);
|
||||
modifier.accept(eventWithContext, eventDefinitionDto);
|
||||
|
||||
assertThat(event.getMessage()).isEqualTo("test_user failed to log in on host0001");
|
||||
}
|
||||
|
||||
@Test
|
||||
void appliesCustomEventSummaryTemplateFromAggregationMessageContext() throws Exception {
|
||||
final EventDefinitionDto eventDefinitionDto = buildEventDefinitionDto(ImmutableSet.of(), ImmutableList.of(), null, emptyList())
|
||||
.toBuilder()
|
||||
.eventSummaryTemplate("Aggregation on ${source.group_field_one} and ${source.group_field_two} has source count of ${source.aggregation_value_count_source}")
|
||||
.build();
|
||||
|
||||
final EventSummaryModifier modifier = new EventSummaryModifier(new Engine());
|
||||
|
||||
final DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
final TestEvent event = new TestEvent(now);
|
||||
final Message message = messageFactory.createMessage("message", "src", now);
|
||||
message.addField("group_field_one", "one");
|
||||
message.addField("group_field_two", "two");
|
||||
message.addField("aggregation_value_count_source", 42.0d);
|
||||
|
||||
final EventWithContext eventWithContext = EventWithContext.create(event, message);
|
||||
modifier.accept(eventWithContext, eventDefinitionDto);
|
||||
|
||||
assertThat(event.getMessage()).isEqualTo("Aggregation on one and two has source count of 42.0");
|
||||
}
|
||||
|
||||
private EventDefinitionDto buildEventDefinitionDto(
|
||||
Set<String> testStreams, List<SeriesSpec> testSeries, AggregationConditions testConditions, List<UsedSearchFilter> filters) {
|
||||
return EventDefinitionDto.builder()
|
||||
.id("dto-id-1")
|
||||
.title("Test Aggregation")
|
||||
.description("A test aggregation event processors")
|
||||
.priority(1)
|
||||
.alert(false)
|
||||
.notificationSettings(EventNotificationSettings.withGracePeriod(60000))
|
||||
.config(AggregationEventProcessorConfig.builder()
|
||||
.query(QUERY_STRING)
|
||||
.filters(filters)
|
||||
.streams(testStreams)
|
||||
.groupBy(ImmutableList.of("group_field_one", "group_field_two"))
|
||||
.series(testSeries)
|
||||
.conditions(testConditions)
|
||||
.searchWithinMs(SEARCH_WINDOW_MS)
|
||||
.executeEveryMs(SEARCH_WINDOW_MS)
|
||||
.build())
|
||||
.keySpec(ImmutableList.of())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user