diff --git a/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java b/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java index e12c630e12..8321121628 100644 --- a/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java +++ b/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java @@ -141,8 +141,10 @@ public class PipelineInterpreter implements MessageProcessor { for (Message message : currentSet) { final String msgId = message.getId(); - // this makes a copy of the stream IDs, which is mutated later in updateStreamBlacklist - // it serves as a worklist, to keep track of which tuples need to be re-run again + // getStreamsView() returns a live, unmodifiable view of the message's streams (not a copy). + // We immediately extract the stream IDs into a mutable set that serves as a worklist + // to keep track of which tuples need to be re-run again. + // The worklist is mutated later in updateStreamBlacklist. final Set currentStreams = message.getStreamsView(); final Set initialStreamIds = new HashSet<>(currentStreams.size()); for (final Stream stream : currentStreams) {