mirror of
https://github.com/Graylog2/graylog2-server.git
synced 2026-03-13 09:32:21 +08:00
Rename ConfigurationStateUpdater to PipelineInterpreterStateUpdater
The class solely manages PipelineInterpreter.State — the new name better reflects its actual responsibility. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -24,7 +24,7 @@ import io.opentelemetry.instrumentation.annotations.WithSpan;
|
||||
import jakarta.inject.Inject;
|
||||
import org.graylog.plugins.pipelineprocessor.db.PipelineDao;
|
||||
import org.graylog.plugins.pipelineprocessor.db.PipelineService;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.ConfigurationStateUpdater;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreterStateUpdater;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.listeners.NoopInterpreterListener;
|
||||
import org.graylog2.decorators.Decorator;
|
||||
@@ -46,7 +46,7 @@ public class PipelineProcessorMessageDecorator implements SearchResponseDecorato
|
||||
private static final String CONFIG_FIELD_PIPELINE = "pipeline";
|
||||
|
||||
private final PipelineInterpreter pipelineInterpreter;
|
||||
private final ConfigurationStateUpdater pipelineStateUpdater;
|
||||
private final PipelineInterpreterStateUpdater pipelineStateUpdater;
|
||||
private final MessageFactory messageFactory;
|
||||
private final ImmutableSet<String> pipelines;
|
||||
|
||||
@@ -93,7 +93,7 @@ public class PipelineProcessorMessageDecorator implements SearchResponseDecorato
|
||||
|
||||
@Inject
|
||||
public PipelineProcessorMessageDecorator(PipelineInterpreter pipelineInterpreter,
|
||||
ConfigurationStateUpdater pipelineStateUpdater,
|
||||
PipelineInterpreterStateUpdater pipelineStateUpdater,
|
||||
MessageFactory messageFactory,
|
||||
@Assisted Decorator decorator) {
|
||||
this.pipelineInterpreter = pipelineInterpreter;
|
||||
|
||||
@@ -81,12 +81,12 @@ public class PipelineInterpreter implements MessageProcessor {
|
||||
private final Meter filteredOutMessages;
|
||||
private final Timer executionTime;
|
||||
private final MetricRegistry metricRegistry;
|
||||
private final ConfigurationStateUpdater stateUpdater;
|
||||
private final PipelineInterpreterStateUpdater stateUpdater;
|
||||
|
||||
@Inject
|
||||
public PipelineInterpreter(MessageQueueAcknowledger messageQueueAcknowledger,
|
||||
MetricRegistry metricRegistry,
|
||||
ConfigurationStateUpdater stateUpdater) {
|
||||
PipelineInterpreterStateUpdater stateUpdater) {
|
||||
|
||||
this.messageQueueAcknowledger = messageQueueAcknowledger;
|
||||
this.filteredOutMessages = metricRegistry.meter(name(ProcessBufferProcessor.class, "filteredOutMessages"));
|
||||
|
||||
@@ -46,8 +46,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
import static org.graylog2.plugin.utilities.ratelimitedlog.RateLimitedLogFactory.createDefaultRateLimitedLog;
|
||||
|
||||
@Singleton
|
||||
public class ConfigurationStateUpdater {
|
||||
private static final RateLimitedLog log = createDefaultRateLimitedLog(ConfigurationStateUpdater.class);
|
||||
public class PipelineInterpreterStateUpdater {
|
||||
private static final RateLimitedLog log = createDefaultRateLimitedLog(PipelineInterpreterStateUpdater.class);
|
||||
|
||||
private final RuleMetricsConfigService ruleMetricsConfigService;
|
||||
private final ScheduledExecutorService scheduler;
|
||||
@@ -60,7 +60,7 @@ public class ConfigurationStateUpdater {
|
||||
private final PipelineMetricRegistry pipelineMetricRegistry;
|
||||
|
||||
@Inject
|
||||
public ConfigurationStateUpdater(RuleService ruleService,
|
||||
public PipelineInterpreterStateUpdater(RuleService ruleService,
|
||||
PipelineService pipelineService,
|
||||
PipelineStreamConnectionsService pipelineStreamConnectionsService,
|
||||
PipelineRuleParser pipelineRuleParser,
|
||||
@@ -38,20 +38,20 @@ import static org.graylog2.plugin.utilities.ratelimitedlog.RateLimitedLogFactory
|
||||
/**
|
||||
* Listens on {@link ClusterEventBus} for pipeline-related events and delegates metadata updates
|
||||
* to {@link PipelineMetadataUpdater}. Since ClusterEventBus only delivers events on the originating
|
||||
* node, this replaces the previous leader-only guard in ConfigurationStateUpdater.
|
||||
* node, this replaces the previous leader-only guard in PipelineInterpreterStateUpdater.
|
||||
*/
|
||||
@Singleton
|
||||
public class PipelineMetadataClusterEventHandler {
|
||||
|
||||
private static final RateLimitedLog log = createDefaultRateLimitedLog(PipelineMetadataClusterEventHandler.class);
|
||||
|
||||
private final Provider<ConfigurationStateUpdater> stateUpdaterProvider;
|
||||
private final Provider<PipelineInterpreterStateUpdater> stateUpdaterProvider;
|
||||
private final PipelineMetadataUpdater metadataUpdater;
|
||||
private final MongoDbPipelineMetadataService pipelineMetadataService;
|
||||
|
||||
@Inject
|
||||
public PipelineMetadataClusterEventHandler(ClusterEventBus clusterEventBus,
|
||||
Provider<ConfigurationStateUpdater> stateUpdaterProvider,
|
||||
Provider<PipelineInterpreterStateUpdater> stateUpdaterProvider,
|
||||
PipelineMetadataUpdater metadataUpdater,
|
||||
MongoDbPipelineMetadataService pipelineMetadataService) {
|
||||
this.stateUpdaterProvider = stateUpdaterProvider;
|
||||
|
||||
@@ -29,7 +29,7 @@ import jakarta.ws.rs.Produces;
|
||||
import jakarta.ws.rs.core.MediaType;
|
||||
import org.apache.shiro.authz.annotation.RequiresAuthentication;
|
||||
import org.apache.shiro.authz.annotation.RequiresPermissions;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.ConfigurationStateUpdater;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreterStateUpdater;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
|
||||
import org.graylog.plugins.pipelineprocessor.simulator.PipelineInterpreterTracer;
|
||||
import org.graylog2.audit.jersey.NoAuditEvent;
|
||||
@@ -57,14 +57,14 @@ import java.util.Map;
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
@RequiresAuthentication
|
||||
public class SimulatorResource extends RestResource implements PluginRestResource {
|
||||
private final ConfigurationStateUpdater pipelineStateUpdater;
|
||||
private final PipelineInterpreterStateUpdater pipelineStateUpdater;
|
||||
private final StreamService streamService;
|
||||
private final MessageFactory messageFactory;
|
||||
private final PipelineInterpreter pipelineInterpreter;
|
||||
|
||||
@Inject
|
||||
public SimulatorResource(PipelineInterpreter pipelineInterpreter,
|
||||
ConfigurationStateUpdater pipelineStateUpdater,
|
||||
PipelineInterpreterStateUpdater pipelineStateUpdater,
|
||||
StreamService streamService,
|
||||
MessageFactory messageFactory) {
|
||||
this.pipelineInterpreter = pipelineInterpreter;
|
||||
|
||||
@@ -24,7 +24,7 @@ import org.apache.commons.lang.StringUtils;
|
||||
import org.graylog.plugins.pipelineprocessor.ast.Pipeline;
|
||||
import org.graylog.plugins.pipelineprocessor.ast.Rule;
|
||||
import org.graylog.plugins.pipelineprocessor.ast.Stage;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.ConfigurationStateUpdater;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreterStateUpdater;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
|
||||
import org.graylog2.plugin.Message;
|
||||
import org.graylog2.plugin.MessageFactory;
|
||||
@@ -41,12 +41,12 @@ import java.util.UUID;
|
||||
|
||||
public class RuleSimulator {
|
||||
|
||||
private final ConfigurationStateUpdater configurationStateUpdater;
|
||||
private final PipelineInterpreterStateUpdater configurationStateUpdater;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final MessageFactory messageFactory;
|
||||
|
||||
@Inject
|
||||
public RuleSimulator(ConfigurationStateUpdater configurationStateUpdater, ObjectMapper objectMapper,
|
||||
public RuleSimulator(PipelineInterpreterStateUpdater configurationStateUpdater, ObjectMapper objectMapper,
|
||||
MessageFactory messageFactory) {
|
||||
this.configurationStateUpdater = configurationStateUpdater;
|
||||
this.objectMapper = objectMapper;
|
||||
|
||||
@@ -36,7 +36,7 @@ import org.graylog.plugins.pipelineprocessor.db.mongodb.MongoDbPipelineStreamCon
|
||||
import org.graylog.plugins.pipelineprocessor.functions.conversion.LongConversion;
|
||||
import org.graylog.plugins.pipelineprocessor.parser.FunctionRegistry;
|
||||
import org.graylog.plugins.pipelineprocessor.parser.PipelineRuleParser;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.ConfigurationStateUpdater;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreterStateUpdater;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.PipelineResolver;
|
||||
import org.graylog.plugins.pipelineprocessor.rest.PipelineConnections;
|
||||
@@ -132,7 +132,7 @@ class MessageCreationLoopPreventionTest extends BaseParserTest {
|
||||
final FunctionRegistry functionRegistry = new FunctionRegistry(functions);
|
||||
final PipelineRuleParser parser = new PipelineRuleParser(functionRegistry);
|
||||
final MetricRegistry metricRegistry = new MetricRegistry();
|
||||
final ConfigurationStateUpdater stateUpdater = new ConfigurationStateUpdater(
|
||||
final PipelineInterpreterStateUpdater stateUpdater = new PipelineInterpreterStateUpdater(
|
||||
ruleService,
|
||||
pipelineService,
|
||||
pipelineStreamConnectionsService,
|
||||
|
||||
@@ -505,7 +505,7 @@ public class PipelineInterpreterTest {
|
||||
final RuleMetricsConfigService ruleMetricsConfigService = mock(RuleMetricsConfigService.class);
|
||||
when(ruleMetricsConfigService.get()).thenReturn(RuleMetricsConfigDto.createDefault());
|
||||
|
||||
final ConfigurationStateUpdater stateUpdater = new ConfigurationStateUpdater(
|
||||
final PipelineInterpreterStateUpdater stateUpdater = new PipelineInterpreterStateUpdater(
|
||||
ruleService,
|
||||
pipelineService,
|
||||
connectionsService,
|
||||
@@ -560,7 +560,7 @@ public class PipelineInterpreterTest {
|
||||
final PipelineRuleParser parser = new PipelineRuleParser(functionRegistry);
|
||||
|
||||
final MetricRegistry metricRegistry = new MetricRegistry();
|
||||
final ConfigurationStateUpdater stateUpdater = new ConfigurationStateUpdater(
|
||||
final PipelineInterpreterStateUpdater stateUpdater = new PipelineInterpreterStateUpdater(
|
||||
ruleService,
|
||||
pipelineService,
|
||||
pipelineStreamConnectionsService,
|
||||
@@ -618,7 +618,7 @@ public class PipelineInterpreterTest {
|
||||
final PipelineRuleParser parser = new PipelineRuleParser(functionRegistry);
|
||||
|
||||
final MetricRegistry metricRegistry = new MetricRegistry();
|
||||
final ConfigurationStateUpdater stateUpdater = new ConfigurationStateUpdater(
|
||||
final PipelineInterpreterStateUpdater stateUpdater = new PipelineInterpreterStateUpdater(
|
||||
ruleService,
|
||||
pipelineService,
|
||||
pipelineStreamConnectionsService,
|
||||
|
||||
@@ -44,7 +44,7 @@ import static org.mockito.Mockito.when;
|
||||
|
||||
class PipelineMetadataClusterEventHandlerTest {
|
||||
|
||||
private final ConfigurationStateUpdater stateUpdater = mock(ConfigurationStateUpdater.class);
|
||||
private final PipelineInterpreterStateUpdater stateUpdater = mock(PipelineInterpreterStateUpdater.class);
|
||||
private final PipelineMetadataUpdater metadataUpdater = mock(PipelineMetadataUpdater.class);
|
||||
private final MongoDbPipelineMetadataService pipelineMetadataService = mock(MongoDbPipelineMetadataService.class);
|
||||
private final PipelineInterpreter.State state = mock(PipelineInterpreter.State.class);
|
||||
@@ -54,7 +54,7 @@ class PipelineMetadataClusterEventHandlerTest {
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
Provider<ConfigurationStateUpdater> stateUpdaterProvider = () -> stateUpdater;
|
||||
Provider<PipelineInterpreterStateUpdater> stateUpdaterProvider = () -> stateUpdater;
|
||||
handler = new PipelineMetadataClusterEventHandler(
|
||||
clusterEventBus, stateUpdaterProvider, metadataUpdater, pipelineMetadataService);
|
||||
when(stateUpdater.getLatestState()).thenReturn(state);
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
package org.graylog.plugins.pipelineprocessor.simulator;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.ConfigurationStateUpdater;
|
||||
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreterStateUpdater;
|
||||
import org.graylog.testing.messages.MessagesExtension;
|
||||
import org.graylog2.plugin.Message;
|
||||
import org.graylog2.plugin.MessageFactory;
|
||||
@@ -38,7 +38,7 @@ class RuleSimulatorTest {
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Mock
|
||||
private ConfigurationStateUpdater configurationStateUpdater;
|
||||
private PipelineInterpreterStateUpdater configurationStateUpdater;
|
||||
|
||||
@BeforeAll
|
||||
public void setUp(MessageFactory messageFactory) {
|
||||
|
||||
Reference in New Issue
Block a user