Migrate event subscribers to ClusterEventBus for local-only delivery

Switch InputRoutingService, DeletedStreamNotificationListener,
StreamDestinationFilterService, and StartPageCleanupListener from the
server EventBus to ClusterEventBus.registerClusterEventSubscriber() so
their handlers only fire on the originating node instead of being
replicated to every node via ClusterEventPeriodical. DataNodeEventService
intentionally stays on the server EventBus (added explanatory comment).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dennis Oelkers
2026-03-11 15:14:20 +01:00
parent 2f5bcf252d
commit 28ea72502c
8 changed files with 28 additions and 24 deletions

View File

@@ -31,6 +31,9 @@ public class DataNodeEventService {
@Inject
public DataNodeEventService(EventBus eventBus, NotificationService notificationService) {
this.notificationService = notificationService;
// NOTE: Intentionally uses the server EventBus (not ClusterEventBus) because
// DataNodeNotficationEvent originates from DataNode processes, not from a Graylog server.
// publishIfFirst() provides idempotency, so redundant delivery across nodes is harmless.
eventBus.register(this);
}

View File

@@ -16,10 +16,10 @@
*/
package org.graylog2.inputs;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.graylog2.events.ClusterEventBus;
import org.graylog.plugins.pipelineprocessor.db.PipelineDao;
import org.graylog.plugins.pipelineprocessor.db.PipelineService;
import org.graylog.plugins.pipelineprocessor.db.RuleDao;
@@ -67,14 +67,15 @@ public class InputRoutingService {
StreamService streamService,
PipelineService pipelineService,
PipelineRuleParser pipelineRuleParser,
EventBus eventBus) {
ClusterEventBus clusterEventBus) {
this.ruleService = ruleService;
this.inputService = inputService;
this.streamService = streamService;
this.pipelineService = pipelineService;
this.pipelineRuleParser = pipelineRuleParser;
eventBus.register(this);
// Subscribe on ClusterEventBus to only receive events originating on this node (local-only delivery).
clusterEventBus.registerClusterEventSubscriber(this);
}
/**

View File

@@ -16,9 +16,9 @@
*/
package org.graylog2.notifications;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import jakarta.inject.Inject;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.streams.events.StreamDeletedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,9 +30,10 @@ public class DeletedStreamNotificationListener {
private final NotificationService notificationService;
@Inject
public DeletedStreamNotificationListener(EventBus eventBus, NotificationService notificationService) {
public DeletedStreamNotificationListener(ClusterEventBus clusterEventBus, NotificationService notificationService) {
this.notificationService = notificationService;
eventBus.register(this);
// Subscribe on ClusterEventBus to only receive events originating on this node (local-only delivery).
clusterEventBus.registerClusterEventSubscriber(this);
}
@Subscribe

View File

@@ -18,7 +18,6 @@ package org.graylog2.streams.filters;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import org.graylog2.database.MongoCollection;
import com.mongodb.client.model.Accumulators;
@@ -81,7 +80,6 @@ public class StreamDestinationFilterService {
@Inject
public StreamDestinationFilterService(MongoCollections mongoCollections,
ClusterEventBus clusterEventBus,
EventBus eventBus,
Optional<DestinationFilterCreationValidator> optionalDestinationFilterCreationValidator) {
this.collection = mongoCollections.collection(COLLECTION, StreamDestinationFilterRuleDTO.class);
this.paginationHelper = mongoCollections.paginationHelper(collection);
@@ -93,7 +91,8 @@ public class StreamDestinationFilterService {
collection.createIndex(Indexes.ascending(FIELD_DESTINATION_TYPE));
collection.createIndex(Indexes.ascending(FIELD_STATUS));
eventBus.register(this);
// Subscribe on ClusterEventBus to only receive events originating on this node (local-only delivery).
clusterEventBus.registerClusterEventSubscriber(this);
}
private Bson parseQuery(String queryString) {

View File

@@ -16,9 +16,9 @@
*/
package org.graylog2.users;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import jakarta.inject.Inject;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.dashboards.events.DashboardDeletedEvent;
import org.graylog2.plugin.database.ValidationException;
import org.graylog2.rest.models.users.requests.DashboardStartPage;
@@ -35,10 +35,11 @@ public class StartPageCleanupListener {
private final UserService userService;
@Inject
public StartPageCleanupListener(EventBus serverEventBus,
public StartPageCleanupListener(ClusterEventBus clusterEventBus,
UserService userService) {
this.userService = userService;
serverEventBus.register(this);
// Subscribe on ClusterEventBus to only receive events originating on this node (local-only delivery).
clusterEventBus.registerClusterEventSubscriber(this);
}
@Subscribe

View File

@@ -17,7 +17,6 @@
package org.graylog2.inputs.routing;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.eventbus.EventBus;
import org.apache.commons.io.IOUtils;
import org.assertj.core.api.Assertions;
import org.graylog.plugins.pipelineprocessor.db.PipelineDao;
@@ -31,6 +30,7 @@ import org.graylog.plugins.pipelineprocessor.rest.PipelineSource;
import org.graylog.plugins.pipelineprocessor.rest.PipelineUtils;
import org.graylog2.database.NotFoundException;
import org.graylog2.inputs.Input;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.inputs.InputRoutingService;
import org.graylog2.inputs.InputService;
import org.graylog2.plugin.streams.Stream;
@@ -67,7 +67,7 @@ class InputRoutingServiceTest {
final RuleService ruleService = mock(RuleService.class);
final PipelineService pipelineService = mock(PipelineService.class);
final PipelineRuleParser pipelineRuleParser = mock(PipelineRuleParser.class);
final EventBus eventBus = mock(EventBus.class);
final ClusterEventBus clusterEventBus = new ClusterEventBus();
final ObjectMapper objectMapper = new ObjectMapperProvider().get();
InputRoutingService inputRoutingService;
@@ -82,7 +82,7 @@ class InputRoutingServiceTest {
@BeforeEach
void setUp() {
inputRoutingService = new InputRoutingService(
ruleService, inputService, streamService, pipelineService, pipelineRuleParser, eventBus);
ruleService, inputService, streamService, pipelineService, pipelineRuleParser, clusterEventBus);
}
@Test

View File

@@ -16,8 +16,8 @@
*/
package org.graylog2.notifications;
import com.google.common.eventbus.EventBus;
import org.assertj.core.api.Assertions;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.streams.events.StreamDeletedEvent;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@@ -31,11 +31,11 @@ class DeletedStreamNotificationListenerTest {
@Test
void testNotificationDeletion() {
EventBus eventBus = new EventBus();
final ClusterEventBus clusterEventBus = new ClusterEventBus();
final NotificationService notificationService = mockNotificationService("123", "456");
new DeletedStreamNotificationListener(eventBus, notificationService);
new DeletedStreamNotificationListener(clusterEventBus, notificationService);
eventBus.post(new StreamDeletedEvent("123", "stream title"));
clusterEventBus.post(new StreamDeletedEvent("123", "stream title"));
final ArgumentCaptor<Notification> argumentCaptor = ArgumentCaptor.forClass(Notification.class);
Mockito.verify(notificationService, Mockito.times(1)).destroy(argumentCaptor.capture());

View File

@@ -17,7 +17,6 @@
package org.graylog2.streams.filters;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.MoreExecutors;
import com.mongodb.client.model.Sorts;
import org.graylog.plugins.pipelineprocessor.rulebuilder.RuleBuilder;
@@ -49,12 +48,12 @@ class StreamDestinationFilterServiceTest {
@Mock
private DestinationFilterCreationValidator mockedFilterLicenseCheck;
private StreamDestinationFilterService service;
private EventBus eventBus;
private ClusterEventBus clusterEventBus;
@BeforeEach
void setUp(MongoCollections mongoCollections) {
this.eventBus = new EventBus("stream-destination-filter-service");
this.service = new StreamDestinationFilterService(mongoCollections, new ClusterEventBus(MoreExecutors.directExecutor()), eventBus, Optional.of(mockedFilterLicenseCheck));
this.clusterEventBus = new ClusterEventBus(MoreExecutors.directExecutor());
this.service = new StreamDestinationFilterService(mongoCollections, clusterEventBus, Optional.of(mockedFilterLicenseCheck));
}
@Test
@@ -314,7 +313,7 @@ class StreamDestinationFilterServiceTest {
optionalDto = service.findByIdForStream("54e3deadbeefdeadbeef1000", "54e3deadbeefdeadbeef0002");
assertThat(optionalDto).isPresent();
eventBus.post(new StreamDeletedEvent("54e3deadbeefdeadbeef1000", "Test Stream 1"));
clusterEventBus.post(new StreamDeletedEvent("54e3deadbeefdeadbeef1000", "Test Stream 1"));
var afterDeletionEvent = service.findByIdForStream("54e3deadbeefdeadbeef1000", "54e3deadbeefdeadbeef0000");
assertThat(afterDeletionEvent).isNotPresent();