diff --git a/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/ConfigurationStateUpdater.java b/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/ConfigurationStateUpdater.java index 647d7ed890..8dbb18bc69 100644 --- a/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/ConfigurationStateUpdater.java +++ b/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/ConfigurationStateUpdater.java @@ -112,10 +112,16 @@ public class ConfigurationStateUpdater { return newState; } + // Metadata updates are best-effort: metadata is a derived cache rebuilt from scratch on every + // restart, so a failed incremental update will be corrected on next boot. private PipelineInterpreter.State reloadAndSave(RulesChangedEvent event) { final PipelineInterpreter.State state = reloadAndSave(); if (configuration.isLeader()) { // avoid duplicate work and possible inconsistencies - metadataUpdater.handleRuleChanges(event, state, pipelineResolver, pipelineMetricRegistry); + try { + metadataUpdater.handleRuleChanges(event, state); + } catch (Exception e) { + log.warn("Failed to update pipeline metadata for rule changes: {} {}", event, e.getMessage()); + } } return state; } @@ -123,7 +129,11 @@ public class ConfigurationStateUpdater { private PipelineInterpreter.State reloadAndSave(PipelinesChangedEvent event) { final PipelineInterpreter.State state = reloadAndSave(); if (configuration.isLeader()) { // avoid duplicate work and possible inconsistencies - metadataUpdater.handlePipelineChanges(event, state, pipelineResolver, pipelineMetricRegistry); + try { + metadataUpdater.handlePipelineChanges(event, state); + } catch (Exception e) { + log.warn("Failed to update pipeline metadata for pipeline changes: {} {}", event, e.getMessage()); + } } return state; } @@ -131,7 +141,11 @@ public class ConfigurationStateUpdater { private PipelineInterpreter.State reloadAndSave(PipelineConnectionsChangedEvent event) { final PipelineInterpreter.State state = reloadAndSave(); if (configuration.isLeader()) { // avoid duplicate work and possible inconsistencies - metadataUpdater.handleConnectionChanges(event, state, pipelineResolver, pipelineMetricRegistry); + try { + metadataUpdater.handleConnectionChanges(event, state); + } catch (Exception e) { + log.warn("Failed to update pipeline metadata for connection changes: {} {}", event, e.getMessage()); + } } return state; } @@ -139,7 +153,11 @@ public class ConfigurationStateUpdater { private PipelineInterpreter.State reloadAndSave(InputDeletedEvent event) { final PipelineInterpreter.State state = reloadAndSave(); if (configuration.isLeader()) { // avoid duplicate work and possible inconsistencies - metadataUpdater.handleInputDeleted(event, state, pipelineResolver, pipelineMetricRegistry); + try { + metadataUpdater.handleInputDeleted(event, state); + } catch (Exception e) { + log.warn("Failed to update pipeline metadata for input deletion: {} {}", event, e.getMessage()); + } } return state; } diff --git a/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineAnalyzer.java b/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineAnalyzer.java index d206ff4987..1d5661b4a3 100644 --- a/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineAnalyzer.java +++ b/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineAnalyzer.java @@ -88,13 +88,11 @@ public class PipelineAnalyzer { PipelineResolver resolver, List ruleRecords) { final ImmutableMap pipelines = resolver.resolvePipelines(pipelineMetricRegistry); - final ImmutableMap functions = resolver.resolveFunctions(pipelines.values(), pipelineMetricRegistry); - return analyzePipelines(pipelines, functions, ruleRecords); + return analyzePipelines(pipelines, ruleRecords); } public Map> analyzePipelines( ImmutableMap pipelines, - ImmutableMap functions, List ruleRecords) { final Map> inputMentions = new HashMap<>(); @@ -112,7 +110,7 @@ public class PipelineAnalyzer { .map(PipelineConnections::streamId) .collect(Collectors.toSet()); - Set stages = functions.get(pipeline.id()).stages(); + Set stages = pipeline.stages(); if (stages != null) { for (Stage stage : stages) { List stageRules = stage.getRules(); diff --git a/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineMetadataUpdater.java b/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineMetadataUpdater.java index 9eb2a593c5..a0a44f7727 100644 --- a/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineMetadataUpdater.java +++ b/graylog2-server/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineMetadataUpdater.java @@ -81,21 +81,21 @@ public class PipelineMetadataUpdater { eventBus.register(this); } - public void handlePipelineChanges(PipelinesChangedEvent event, PipelineInterpreter.State state, PipelineResolver resolver, PipelineMetricRegistry metricRegistry) { + public void handlePipelineChanges(PipelinesChangedEvent event, PipelineInterpreter.State state) { deletePipelineEntries(event); deleteInputMentionsForPipelines(event); Set pipelineDaos = affectedPipelines(event); - handleUpdates(pipelineDaos, state, resolver, metricRegistry); + handleUpdates(pipelineDaos, state); } - public void handleConnectionChanges(PipelineConnectionsChangedEvent event, PipelineInterpreter.State state, PipelineResolver resolver, PipelineMetricRegistry metricRegistry) { + public void handleConnectionChanges(PipelineConnectionsChangedEvent event, PipelineInterpreter.State state) { Set pipelineDaos = affectedPipelines(event); - handleUpdates(pipelineDaos, state, resolver, metricRegistry); + handleUpdates(pipelineDaos, state); } - public void handleRuleChanges(RulesChangedEvent event, PipelineInterpreter.State state, PipelineResolver resolver, PipelineMetricRegistry metricRegistry) { + public void handleRuleChanges(RulesChangedEvent event, PipelineInterpreter.State state) { deleteInputMentionsForRules(event); - handleUpdates(affectedPipelines(event), state, resolver, metricRegistry); + handleUpdates(affectedPipelines(event), state); } /** @@ -103,8 +103,8 @@ public class PipelineMetadataUpdater { * - remove the metadata record for that input * - pipelines that referenced that input need to be re-analyzed to potentially reset the has_input_references flag */ - public void handleInputDeleted(InputDeletedEvent event, PipelineInterpreter.State state, PipelineResolver resolver, PipelineMetricRegistry metricRegistry) { - handleUpdates(affectedPipelines(event), state, resolver, metricRegistry); + public void handleInputDeleted(InputDeletedEvent event, PipelineInterpreter.State state) { + handleUpdates(affectedPipelines(event), state); inputsMetadataService.deleteInput(event.inputId()); } @@ -119,13 +119,10 @@ public class PipelineMetadataUpdater { } protected void handleUpdates(Set pipelineDaos, - PipelineInterpreter.State state, - PipelineResolver resolver, - PipelineMetricRegistry metricRegistry) { + PipelineInterpreter.State state) { ImmutableMap pipelines = affectedPipelinesAsMap(pipelineDaos, state); - ImmutableMap functions = resolver.resolveFunctions(pipelines.values(), metricRegistry); List ruleRecords = new ArrayList<>(); - Map> inputMentions = pipelineAnalyzer.analyzePipelines(pipelines, functions, ruleRecords); + Map> inputMentions = pipelineAnalyzer.analyzePipelines(pipelines, ruleRecords); inputsMetadataService.save(inputMentions, true); pipelineMetadataService.save(ruleRecords, true); diff --git a/graylog2-server/src/main/java/org/graylog2/migrations/V20251222123500_CreatePipelineMetadata.java b/graylog2-server/src/main/java/org/graylog2/migrations/V20251222123500_CreatePipelineMetadata.java index c2aee93406..3baff6c1c6 100644 --- a/graylog2-server/src/main/java/org/graylog2/migrations/V20251222123500_CreatePipelineMetadata.java +++ b/graylog2-server/src/main/java/org/graylog2/migrations/V20251222123500_CreatePipelineMetadata.java @@ -30,7 +30,6 @@ import org.graylog.plugins.pipelineprocessor.processors.PipelineAnalyzer; import org.graylog.plugins.pipelineprocessor.processors.PipelineResolver; import org.graylog.plugins.pipelineprocessor.processors.PipelineResolverConfig; import org.graylog2.database.MongoConnection; -import org.graylog2.plugin.cluster.ClusterConfigService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,20 +37,18 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import static org.graylog.plugins.pipelineprocessor.db.mongodb.MongoDbInputsMetadataService.INPUTS_COLLECTION_NAME; import static org.graylog.plugins.pipelineprocessor.db.mongodb.MongoDbPipelineMetadataService.RULES_COLLECTION_NAME; /** - * Migration to create the pipeline metadata collections, if any of them do not exist yet. - * Updated to include information routing rules and routed streams. + * Rebuilds the pipeline metadata collections on every startup. This ensures metadata stays in sync with the + * primary pipeline, rule, and connection data even if incremental updates were missed or failed. */ public class V20251222123500_CreatePipelineMetadata extends Migration { private static final Logger LOG = LoggerFactory.getLogger(V20251222123500_CreatePipelineMetadata.class); - private final ClusterConfigService configService; private final MongoDatabase db; private final PipelineResolver pipelineResolver; private final PipelineAnalyzer pipelineAnalyzer; @@ -59,8 +56,7 @@ public class V20251222123500_CreatePipelineMetadata extends Migration { private final MongoDbInputsMetadataService inputsMetadataService; @Inject - public V20251222123500_CreatePipelineMetadata(ClusterConfigService configService, - MongoConnection mongoConnection, + public V20251222123500_CreatePipelineMetadata(MongoConnection mongoConnection, MongoDbRuleService ruleService, MongoDbPipelineMetadataService pipelineMetadataService, MongoDbInputsMetadataService inputsMetadataService, @@ -69,7 +65,6 @@ public class V20251222123500_CreatePipelineMetadata extends Migration { PipelineStreamConnectionsService pipelineStreamConnectionsService, PipelineRuleParser pipelineRuleParser, PipelineResolver.Factory pipelineResolverFactory) { - this.configService = configService; this.db = mongoConnection.getMongoDatabase(); this.pipelineMetadataService = pipelineMetadataService; this.inputsMetadataService = inputsMetadataService; @@ -89,16 +84,17 @@ public class V20251222123500_CreatePipelineMetadata extends Migration { return ZonedDateTime.parse("2025-12-22T12:35:00Z"); } + // This migration intentionally runs on every server restart (no MigrationCompleted guard). + // Pipeline metadata is a derived cache built from pipelines, rules, and stream connections. + // Incremental updates can silently fail or drift out of sync due to lost events, exceptions, + // or partial writes. Rebuilding from scratch on startup guarantees consistency. @Override public void upgrade() { - if (migrationAlreadyApplied()) { - return; - } doUpgrade(); } private void createMetadata() { - LOG.info("Creating pipeline metadata collection."); + LOG.info("Rebuilding pipeline metadata collections."); final List ruleRecords = new ArrayList<>(); final Map> inputMentions = pipelineAnalyzer.analyzePipelines(pipelineResolver, ruleRecords); @@ -111,18 +107,5 @@ public class V20251222123500_CreatePipelineMetadata extends Migration { db.getCollection(RULES_COLLECTION_NAME).drop(); db.getCollection(INPUTS_COLLECTION_NAME).drop(); createMetadata(); - - markMigrationApplied(); } - - private boolean migrationAlreadyApplied() { - return Objects.nonNull(configService.get(V20251222123500_CreatePipelineMetadata.MigrationCompleted.class)); - } - - // The second migration marker indicates that schema has been upgraded to include routed_streams field - private void markMigrationApplied() { - configService.write(new V20251222123500_CreatePipelineMetadata.MigrationCompleted()); - } - - public record MigrationCompleted() {} } diff --git a/graylog2-server/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineAnalyzerTest.java b/graylog2-server/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineAnalyzerTest.java index 3c6ac825fe..28b6405a44 100644 --- a/graylog2-server/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineAnalyzerTest.java +++ b/graylog2-server/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineAnalyzerTest.java @@ -82,7 +82,7 @@ class PipelineAnalyzerTest { @Test void empty() { Map> result = pipelineAnalyzer.analyzePipelines( - ImmutableMap.of(), ImmutableMap.of(), List.of() + ImmutableMap.of(), List.of() ); assertTrue(result.isEmpty()); } @@ -92,7 +92,7 @@ class PipelineAnalyzerTest { Pipeline pipeline1 = testUtil.createPipelineWithRules("pipeline1", List.of(testUtil.ALWAYS_TRUE)); Map> result = pipelineAnalyzer.analyzePipelines( - ImmutableMap.of(pipeline1.id(), pipeline1), ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords); + ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords); assertTrue(result.isEmpty()); assertTrue(ruleRecords.stream().anyMatch(dao -> @@ -106,7 +106,7 @@ class PipelineAnalyzerTest { Pipeline pipeline1 = testUtil.createPipelineWithRules("pipeline1", List.of(testUtil.REMOVE_FIELD)); Map> result = pipelineAnalyzer.analyzePipelines( - ImmutableMap.of(pipeline1.id(), pipeline1), ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords); + ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords); assertTrue(result.isEmpty()); assertTrue(ruleRecords.stream().anyMatch(dao -> @@ -121,7 +121,7 @@ class PipelineAnalyzerTest { Pipeline pipeline1 = testUtil.createPipelineWithRules("pipeline1", List.of(testUtil.FROM_INPUT)); Map> result = pipelineAnalyzer.analyzePipelines( - ImmutableMap.of(pipeline1.id(), pipeline1), ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords); + ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords); assertTrue(result.containsKey(INPUT_ID)); Set mentions = result.get(INPUT_ID); @@ -141,7 +141,7 @@ class PipelineAnalyzerTest { Pipeline pipeline1 = testUtil.createPipelineWithRules("pipeline1", List.of(testUtil.GL2_SOURCE_INPUT)); Map> result = pipelineAnalyzer.analyzePipelines( - ImmutableMap.of(pipeline1.id(), pipeline1), ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords); + ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords); assertTrue(result.containsKey(INPUT_ID)); Set mentions = result.get(INPUT_ID); @@ -169,7 +169,7 @@ class PipelineAnalyzerTest { when(streamService.loadAllByTitle(STREAM3_TITLE)).thenReturn(List.of(stream3)); pipelineAnalyzer.analyzePipelines( - ImmutableMap.of(pipeline1.id(), pipeline1), ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords); + ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords); assertTrue(ruleRecords.stream().anyMatch(dao -> dao.pipelineId().equals(pipeline1.id()) @@ -187,7 +187,7 @@ class PipelineAnalyzerTest { Pipeline pipeline = createPipelineWithFailingRule(); Map> result = pipelineAnalyzer.analyzePipelines( - ImmutableMap.of(pipeline.id(), pipeline), ImmutableMap.of(pipeline.id(), pipeline), ruleRecords); + ImmutableMap.of(pipeline.id(), pipeline), ruleRecords); Set mentions = result.get(INPUT_ID); assertTrue(mentions.stream().anyMatch(entry -> entry.ruleId().equals(FROM_INPUT_ID))); diff --git a/graylog2-server/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineMetadataUpdaterTest.java b/graylog2-server/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineMetadataUpdaterTest.java index 8812d28fe7..f47686b622 100644 --- a/graylog2-server/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineMetadataUpdaterTest.java +++ b/graylog2-server/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineMetadataUpdaterTest.java @@ -52,8 +52,6 @@ class PipelineMetadataUpdaterTest { private PipelineMetadataUpdater updater; private final PipelineInterpreter.State state = mock(PipelineInterpreter.State.class); - private final PipelineResolver resolver = mock(PipelineResolver.class); - private final PipelineMetricRegistry metricRegistry = mock(PipelineMetricRegistry.class); private final PipelineAnalyzer pipelineAnalyzer = mock(PipelineAnalyzer.class); private final EventBus eventBus = mock(EventBus.class); @@ -82,13 +80,13 @@ class PipelineMetadataUpdaterTest { void testHandlePipelineChanges() { PipelinesChangedEvent event = PipelinesChangedEvent.create(Set.of("id1"), Set.of("id2")); - updater.handlePipelineChanges(event, state, resolver, metricRegistry); + updater.handlePipelineChanges(event, state); verify(pipelineMetadataService).delete(Set.of("id1")); verify(inputsMetadataService).deleteInputMentionsByPipelineId("id1"); ArgumentCaptor> pipelineCaptor = ArgumentCaptor.forClass(Set.class); - verify(updater).handleUpdates(pipelineCaptor.capture(), any(), any(), any()); + verify(updater).handleUpdates(pipelineCaptor.capture(), any()); assertTrue(pipelineCaptor.getValue().stream().anyMatch(p -> p.id().equals("id2"))); } @@ -96,10 +94,10 @@ class PipelineMetadataUpdaterTest { void testHandleConnectionChanges() { PipelineConnectionsChangedEvent event = PipelineConnectionsChangedEvent.create("stream_id", Set.of("id1")); - updater.handleConnectionChanges(event, state, resolver, metricRegistry); + updater.handleConnectionChanges(event, state); ArgumentCaptor> pipelineCaptor = ArgumentCaptor.forClass(Set.class); - verify(updater).handleUpdates(pipelineCaptor.capture(), any(), any(), any()); + verify(updater).handleUpdates(pipelineCaptor.capture(), any()); assertTrue(pipelineCaptor.getValue().stream().anyMatch(p -> p.id().equals("id1"))); } @@ -113,12 +111,12 @@ class PipelineMetadataUpdaterTest { "title1", "description1", "source1", null, null))) .when(updater).affectedPipelines(event); - updater.handleRuleChanges(event, state, resolver, metricRegistry); + updater.handleRuleChanges(event, state); verify(inputsMetadataService).deleteInputMentionsByRuleId("rule1"); ArgumentCaptor> pipelineCaptor = ArgumentCaptor.forClass(Set.class); - verify(updater).handleUpdates(pipelineCaptor.capture(), any(), any(), any()); + verify(updater).handleUpdates(pipelineCaptor.capture(), any()); assertTrue(pipelineCaptor.getValue().stream().anyMatch(p -> p.id().equals("pipeline1"))); } @@ -135,7 +133,7 @@ class PipelineMetadataUpdaterTest { )) .build() ); - updater.handleInputDeleted(event, state, resolver, metricRegistry); + updater.handleInputDeleted(event, state); verify(inputsMetadataService).deleteInput("input1"); }