Streamline the pipeline metadata handling (#25021)

* Eliminate redundant work; improve robustness

* improve log messages

---------

Co-authored-by: Florian Petersen <188503754+fpetersen-gl@users.noreply.github.com>
This commit is contained in:
Patrick Mann
2026-02-25 10:10:02 +01:00
committed by GitHub
parent 9207ad044b
commit 5d2c51a09b
6 changed files with 56 additions and 62 deletions

View File

@@ -112,10 +112,16 @@ public class ConfigurationStateUpdater {
return newState;
}
// Metadata updates are best-effort: metadata is a derived cache rebuilt from scratch on every
// restart, so a failed incremental update will be corrected on next boot.
private PipelineInterpreter.State reloadAndSave(RulesChangedEvent event) {
final PipelineInterpreter.State state = reloadAndSave();
if (configuration.isLeader()) { // avoid duplicate work and possible inconsistencies
metadataUpdater.handleRuleChanges(event, state, pipelineResolver, pipelineMetricRegistry);
try {
metadataUpdater.handleRuleChanges(event, state);
} catch (Exception e) {
log.warn("Failed to update pipeline metadata for rule changes: {} {}", event, e.getMessage());
}
}
return state;
}
@@ -123,7 +129,11 @@ public class ConfigurationStateUpdater {
private PipelineInterpreter.State reloadAndSave(PipelinesChangedEvent event) {
final PipelineInterpreter.State state = reloadAndSave();
if (configuration.isLeader()) { // avoid duplicate work and possible inconsistencies
metadataUpdater.handlePipelineChanges(event, state, pipelineResolver, pipelineMetricRegistry);
try {
metadataUpdater.handlePipelineChanges(event, state);
} catch (Exception e) {
log.warn("Failed to update pipeline metadata for pipeline changes: {} {}", event, e.getMessage());
}
}
return state;
}
@@ -131,7 +141,11 @@ public class ConfigurationStateUpdater {
private PipelineInterpreter.State reloadAndSave(PipelineConnectionsChangedEvent event) {
final PipelineInterpreter.State state = reloadAndSave();
if (configuration.isLeader()) { // avoid duplicate work and possible inconsistencies
metadataUpdater.handleConnectionChanges(event, state, pipelineResolver, pipelineMetricRegistry);
try {
metadataUpdater.handleConnectionChanges(event, state);
} catch (Exception e) {
log.warn("Failed to update pipeline metadata for connection changes: {} {}", event, e.getMessage());
}
}
return state;
}
@@ -139,7 +153,11 @@ public class ConfigurationStateUpdater {
private PipelineInterpreter.State reloadAndSave(InputDeletedEvent event) {
final PipelineInterpreter.State state = reloadAndSave();
if (configuration.isLeader()) { // avoid duplicate work and possible inconsistencies
metadataUpdater.handleInputDeleted(event, state, pipelineResolver, pipelineMetricRegistry);
try {
metadataUpdater.handleInputDeleted(event, state);
} catch (Exception e) {
log.warn("Failed to update pipeline metadata for input deletion: {} {}", event, e.getMessage());
}
}
return state;
}

View File

@@ -88,13 +88,11 @@ public class PipelineAnalyzer {
PipelineResolver resolver,
List<PipelineRulesMetadataDao> ruleRecords) {
final ImmutableMap<String, Pipeline> pipelines = resolver.resolvePipelines(pipelineMetricRegistry);
final ImmutableMap<String, Pipeline> functions = resolver.resolveFunctions(pipelines.values(), pipelineMetricRegistry);
return analyzePipelines(pipelines, functions, ruleRecords);
return analyzePipelines(pipelines, ruleRecords);
}
public Map<String, Set<PipelineInputsMetadataDao.MentionedInEntry>> analyzePipelines(
ImmutableMap<String, Pipeline> pipelines,
ImmutableMap<String, Pipeline> functions,
List<PipelineRulesMetadataDao> ruleRecords) {
final Map<String, Set<PipelineInputsMetadataDao.MentionedInEntry>> inputMentions = new HashMap<>();
@@ -112,7 +110,7 @@ public class PipelineAnalyzer {
.map(PipelineConnections::streamId)
.collect(Collectors.toSet());
Set<Stage> stages = functions.get(pipeline.id()).stages();
Set<Stage> stages = pipeline.stages();
if (stages != null) {
for (Stage stage : stages) {
List<Rule> stageRules = stage.getRules();

View File

@@ -81,21 +81,21 @@ public class PipelineMetadataUpdater {
eventBus.register(this);
}
public void handlePipelineChanges(PipelinesChangedEvent event, PipelineInterpreter.State state, PipelineResolver resolver, PipelineMetricRegistry metricRegistry) {
public void handlePipelineChanges(PipelinesChangedEvent event, PipelineInterpreter.State state) {
deletePipelineEntries(event);
deleteInputMentionsForPipelines(event);
Set<PipelineDao> pipelineDaos = affectedPipelines(event);
handleUpdates(pipelineDaos, state, resolver, metricRegistry);
handleUpdates(pipelineDaos, state);
}
public void handleConnectionChanges(PipelineConnectionsChangedEvent event, PipelineInterpreter.State state, PipelineResolver resolver, PipelineMetricRegistry metricRegistry) {
public void handleConnectionChanges(PipelineConnectionsChangedEvent event, PipelineInterpreter.State state) {
Set<PipelineDao> pipelineDaos = affectedPipelines(event);
handleUpdates(pipelineDaos, state, resolver, metricRegistry);
handleUpdates(pipelineDaos, state);
}
public void handleRuleChanges(RulesChangedEvent event, PipelineInterpreter.State state, PipelineResolver resolver, PipelineMetricRegistry metricRegistry) {
public void handleRuleChanges(RulesChangedEvent event, PipelineInterpreter.State state) {
deleteInputMentionsForRules(event);
handleUpdates(affectedPipelines(event), state, resolver, metricRegistry);
handleUpdates(affectedPipelines(event), state);
}
/**
@@ -103,8 +103,8 @@ public class PipelineMetadataUpdater {
* - remove the metadata record for that input
* - pipelines that referenced that input need to be re-analyzed to potentially reset the has_input_references flag
*/
public void handleInputDeleted(InputDeletedEvent event, PipelineInterpreter.State state, PipelineResolver resolver, PipelineMetricRegistry metricRegistry) {
handleUpdates(affectedPipelines(event), state, resolver, metricRegistry);
public void handleInputDeleted(InputDeletedEvent event, PipelineInterpreter.State state) {
handleUpdates(affectedPipelines(event), state);
inputsMetadataService.deleteInput(event.inputId());
}
@@ -119,13 +119,10 @@ public class PipelineMetadataUpdater {
}
protected void handleUpdates(Set<PipelineDao> pipelineDaos,
PipelineInterpreter.State state,
PipelineResolver resolver,
PipelineMetricRegistry metricRegistry) {
PipelineInterpreter.State state) {
ImmutableMap<String, Pipeline> pipelines = affectedPipelinesAsMap(pipelineDaos, state);
ImmutableMap<String, Pipeline> functions = resolver.resolveFunctions(pipelines.values(), metricRegistry);
List<PipelineRulesMetadataDao> ruleRecords = new ArrayList<>();
Map<String, Set<PipelineInputsMetadataDao.MentionedInEntry>> inputMentions = pipelineAnalyzer.analyzePipelines(pipelines, functions, ruleRecords);
Map<String, Set<PipelineInputsMetadataDao.MentionedInEntry>> inputMentions = pipelineAnalyzer.analyzePipelines(pipelines, ruleRecords);
inputsMetadataService.save(inputMentions, true);
pipelineMetadataService.save(ruleRecords, true);

View File

@@ -30,7 +30,6 @@ import org.graylog.plugins.pipelineprocessor.processors.PipelineAnalyzer;
import org.graylog.plugins.pipelineprocessor.processors.PipelineResolver;
import org.graylog.plugins.pipelineprocessor.processors.PipelineResolverConfig;
import org.graylog2.database.MongoConnection;
import org.graylog2.plugin.cluster.ClusterConfigService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,20 +37,18 @@ import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static org.graylog.plugins.pipelineprocessor.db.mongodb.MongoDbInputsMetadataService.INPUTS_COLLECTION_NAME;
import static org.graylog.plugins.pipelineprocessor.db.mongodb.MongoDbPipelineMetadataService.RULES_COLLECTION_NAME;
/**
* Migration to create the pipeline metadata collections, if any of them do not exist yet.
* Updated to include information routing rules and routed streams.
* Rebuilds the pipeline metadata collections on every startup. This ensures metadata stays in sync with the
* primary pipeline, rule, and connection data even if incremental updates were missed or failed.
*/
public class V20251222123500_CreatePipelineMetadata extends Migration {
private static final Logger LOG = LoggerFactory.getLogger(V20251222123500_CreatePipelineMetadata.class);
private final ClusterConfigService configService;
private final MongoDatabase db;
private final PipelineResolver pipelineResolver;
private final PipelineAnalyzer pipelineAnalyzer;
@@ -59,8 +56,7 @@ public class V20251222123500_CreatePipelineMetadata extends Migration {
private final MongoDbInputsMetadataService inputsMetadataService;
@Inject
public V20251222123500_CreatePipelineMetadata(ClusterConfigService configService,
MongoConnection mongoConnection,
public V20251222123500_CreatePipelineMetadata(MongoConnection mongoConnection,
MongoDbRuleService ruleService,
MongoDbPipelineMetadataService pipelineMetadataService,
MongoDbInputsMetadataService inputsMetadataService,
@@ -69,7 +65,6 @@ public class V20251222123500_CreatePipelineMetadata extends Migration {
PipelineStreamConnectionsService pipelineStreamConnectionsService,
PipelineRuleParser pipelineRuleParser,
PipelineResolver.Factory pipelineResolverFactory) {
this.configService = configService;
this.db = mongoConnection.getMongoDatabase();
this.pipelineMetadataService = pipelineMetadataService;
this.inputsMetadataService = inputsMetadataService;
@@ -89,16 +84,17 @@ public class V20251222123500_CreatePipelineMetadata extends Migration {
return ZonedDateTime.parse("2025-12-22T12:35:00Z");
}
// This migration intentionally runs on every server restart (no MigrationCompleted guard).
// Pipeline metadata is a derived cache built from pipelines, rules, and stream connections.
// Incremental updates can silently fail or drift out of sync due to lost events, exceptions,
// or partial writes. Rebuilding from scratch on startup guarantees consistency.
@Override
public void upgrade() {
if (migrationAlreadyApplied()) {
return;
}
doUpgrade();
}
private void createMetadata() {
LOG.info("Creating pipeline metadata collection.");
LOG.info("Rebuilding pipeline metadata collections.");
final List<PipelineRulesMetadataDao> ruleRecords = new ArrayList<>();
final Map<String, Set<PipelineInputsMetadataDao.MentionedInEntry>> inputMentions =
pipelineAnalyzer.analyzePipelines(pipelineResolver, ruleRecords);
@@ -111,18 +107,5 @@ public class V20251222123500_CreatePipelineMetadata extends Migration {
db.getCollection(RULES_COLLECTION_NAME).drop();
db.getCollection(INPUTS_COLLECTION_NAME).drop();
createMetadata();
markMigrationApplied();
}
private boolean migrationAlreadyApplied() {
return Objects.nonNull(configService.get(V20251222123500_CreatePipelineMetadata.MigrationCompleted.class));
}
// The second migration marker indicates that schema has been upgraded to include routed_streams field
private void markMigrationApplied() {
configService.write(new V20251222123500_CreatePipelineMetadata.MigrationCompleted());
}
public record MigrationCompleted() {}
}

View File

@@ -82,7 +82,7 @@ class PipelineAnalyzerTest {
@Test
void empty() {
Map<String, Set<PipelineInputsMetadataDao.MentionedInEntry>> result = pipelineAnalyzer.analyzePipelines(
ImmutableMap.of(), ImmutableMap.of(), List.of()
ImmutableMap.of(), List.of()
);
assertTrue(result.isEmpty());
}
@@ -92,7 +92,7 @@ class PipelineAnalyzerTest {
Pipeline pipeline1 = testUtil.createPipelineWithRules("pipeline1", List.of(testUtil.ALWAYS_TRUE));
Map<String, Set<PipelineInputsMetadataDao.MentionedInEntry>> result = pipelineAnalyzer.analyzePipelines(
ImmutableMap.of(pipeline1.id(), pipeline1), ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords);
ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords);
assertTrue(result.isEmpty());
assertTrue(ruleRecords.stream().anyMatch(dao ->
@@ -106,7 +106,7 @@ class PipelineAnalyzerTest {
Pipeline pipeline1 = testUtil.createPipelineWithRules("pipeline1", List.of(testUtil.REMOVE_FIELD));
Map<String, Set<PipelineInputsMetadataDao.MentionedInEntry>> result = pipelineAnalyzer.analyzePipelines(
ImmutableMap.of(pipeline1.id(), pipeline1), ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords);
ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords);
assertTrue(result.isEmpty());
assertTrue(ruleRecords.stream().anyMatch(dao ->
@@ -121,7 +121,7 @@ class PipelineAnalyzerTest {
Pipeline pipeline1 = testUtil.createPipelineWithRules("pipeline1", List.of(testUtil.FROM_INPUT));
Map<String, Set<PipelineInputsMetadataDao.MentionedInEntry>> result = pipelineAnalyzer.analyzePipelines(
ImmutableMap.of(pipeline1.id(), pipeline1), ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords);
ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords);
assertTrue(result.containsKey(INPUT_ID));
Set<PipelineInputsMetadataDao.MentionedInEntry> mentions = result.get(INPUT_ID);
@@ -141,7 +141,7 @@ class PipelineAnalyzerTest {
Pipeline pipeline1 = testUtil.createPipelineWithRules("pipeline1", List.of(testUtil.GL2_SOURCE_INPUT));
Map<String, Set<PipelineInputsMetadataDao.MentionedInEntry>> result = pipelineAnalyzer.analyzePipelines(
ImmutableMap.of(pipeline1.id(), pipeline1), ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords);
ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords);
assertTrue(result.containsKey(INPUT_ID));
Set<PipelineInputsMetadataDao.MentionedInEntry> mentions = result.get(INPUT_ID);
@@ -169,7 +169,7 @@ class PipelineAnalyzerTest {
when(streamService.loadAllByTitle(STREAM3_TITLE)).thenReturn(List.of(stream3));
pipelineAnalyzer.analyzePipelines(
ImmutableMap.of(pipeline1.id(), pipeline1), ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords);
ImmutableMap.of(pipeline1.id(), pipeline1), ruleRecords);
assertTrue(ruleRecords.stream().anyMatch(dao ->
dao.pipelineId().equals(pipeline1.id())
@@ -187,7 +187,7 @@ class PipelineAnalyzerTest {
Pipeline pipeline = createPipelineWithFailingRule();
Map<String, Set<PipelineInputsMetadataDao.MentionedInEntry>> result = pipelineAnalyzer.analyzePipelines(
ImmutableMap.of(pipeline.id(), pipeline), ImmutableMap.of(pipeline.id(), pipeline), ruleRecords);
ImmutableMap.of(pipeline.id(), pipeline), ruleRecords);
Set<PipelineInputsMetadataDao.MentionedInEntry> mentions = result.get(INPUT_ID);
assertTrue(mentions.stream().anyMatch(entry -> entry.ruleId().equals(FROM_INPUT_ID)));

View File

@@ -52,8 +52,6 @@ class PipelineMetadataUpdaterTest {
private PipelineMetadataUpdater updater;
private final PipelineInterpreter.State state = mock(PipelineInterpreter.State.class);
private final PipelineResolver resolver = mock(PipelineResolver.class);
private final PipelineMetricRegistry metricRegistry = mock(PipelineMetricRegistry.class);
private final PipelineAnalyzer pipelineAnalyzer = mock(PipelineAnalyzer.class);
private final EventBus eventBus = mock(EventBus.class);
@@ -82,13 +80,13 @@ class PipelineMetadataUpdaterTest {
void testHandlePipelineChanges() {
PipelinesChangedEvent event = PipelinesChangedEvent.create(Set.of("id1"), Set.of("id2"));
updater.handlePipelineChanges(event, state, resolver, metricRegistry);
updater.handlePipelineChanges(event, state);
verify(pipelineMetadataService).delete(Set.of("id1"));
verify(inputsMetadataService).deleteInputMentionsByPipelineId("id1");
ArgumentCaptor<Set<PipelineDao>> pipelineCaptor = ArgumentCaptor.forClass(Set.class);
verify(updater).handleUpdates(pipelineCaptor.capture(), any(), any(), any());
verify(updater).handleUpdates(pipelineCaptor.capture(), any());
assertTrue(pipelineCaptor.getValue().stream().anyMatch(p -> p.id().equals("id2")));
}
@@ -96,10 +94,10 @@ class PipelineMetadataUpdaterTest {
void testHandleConnectionChanges() {
PipelineConnectionsChangedEvent event = PipelineConnectionsChangedEvent.create("stream_id", Set.of("id1"));
updater.handleConnectionChanges(event, state, resolver, metricRegistry);
updater.handleConnectionChanges(event, state);
ArgumentCaptor<Set<PipelineDao>> pipelineCaptor = ArgumentCaptor.forClass(Set.class);
verify(updater).handleUpdates(pipelineCaptor.capture(), any(), any(), any());
verify(updater).handleUpdates(pipelineCaptor.capture(), any());
assertTrue(pipelineCaptor.getValue().stream().anyMatch(p -> p.id().equals("id1")));
}
@@ -113,12 +111,12 @@ class PipelineMetadataUpdaterTest {
"title1", "description1", "source1", null, null)))
.when(updater).affectedPipelines(event);
updater.handleRuleChanges(event, state, resolver, metricRegistry);
updater.handleRuleChanges(event, state);
verify(inputsMetadataService).deleteInputMentionsByRuleId("rule1");
ArgumentCaptor<Set<PipelineDao>> pipelineCaptor = ArgumentCaptor.forClass(Set.class);
verify(updater).handleUpdates(pipelineCaptor.capture(), any(), any(), any());
verify(updater).handleUpdates(pipelineCaptor.capture(), any());
assertTrue(pipelineCaptor.getValue().stream().anyMatch(p -> p.id().equals("pipeline1")));
}
@@ -135,7 +133,7 @@ class PipelineMetadataUpdaterTest {
))
.build()
);
updater.handleInputDeleted(event, state, resolver, metricRegistry);
updater.handleInputDeleted(event, state);
verify(inputsMetadataService).deleteInput("input1");
}