diff --git a/changelog/unreleased/pr-24671.toml b/changelog/unreleased/pr-24671.toml new file mode 100644 index 0000000000..6b0083fcf1 --- /dev/null +++ b/changelog/unreleased/pr-24671.toml @@ -0,0 +1,4 @@ +type = "f" +message = "Fix metrix datastream creation in situations where opensearch in datanode starts too slowly" + +pulls = ["24671"] diff --git a/data-node/src/main/java/org/graylog/datanode/metrics/ConfigureMetricsIndexSettings.java b/data-node/src/main/java/org/graylog/datanode/metrics/ConfigureMetricsIndexSettings.java index 2b40af5e28..f74a4290a3 100644 --- a/data-node/src/main/java/org/graylog/datanode/metrics/ConfigureMetricsIndexSettings.java +++ b/data-node/src/main/java/org/graylog/datanode/metrics/ConfigureMetricsIndexSettings.java @@ -28,6 +28,7 @@ import org.graylog.datanode.opensearch.statemachine.OpensearchState; import org.graylog.datanode.periodicals.MetricsCollector; import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer; import org.graylog.storage.opensearch2.DataStreamAdapterOS2; +import org.graylog.storage.opensearch2.OpenSearchClient; import org.graylog.storage.opensearch2.ism.IsmApi; import org.graylog2.cluster.nodes.DataNodeDto; import org.graylog2.cluster.nodes.NodeService; @@ -48,17 +49,19 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; public class ConfigureMetricsIndexSettings implements StateMachineTracer { private final Logger log = LoggerFactory.getLogger(ConfigureMetricsIndexSettings.class); + private final AtomicBoolean datastreamCreated = new AtomicBoolean(false); + private final OpensearchProcess process; private final Configuration configuration; private final IndexFieldTypesService indexFieldTypesService; private final ObjectMapper objectMapper; - private DataStreamService dataStreamService; private final NodeService nodeService; @Inject @@ -76,23 +79,33 @@ public class ConfigureMetricsIndexSettings implements StateMachineTracerdestination combination, because the process may become UNAVAILABLE before it + // finally starts (due to slow startup or limited resources) and then the source state is not starting, + // as originally assumed. Instead, let's remember if we have already created a datastream once. + if (destination == OpensearchState.AVAILABLE && process.isManagerNode()) { process.openSearchClient().ifPresent(client -> { - final IsmApi ismApi = new IsmApi(client, objectMapper); - int replicas = nodeService.allActive().size() == 1 ? 0 : 1; - dataStreamService = new DataStreamServiceImpl( - new DataStreamAdapterOS2(client, objectMapper, ismApi), - indexFieldTypesService, - replicas - ); - dataStreamService.createDataStream(configuration.getMetricsStream(), - configuration.getMetricsTimestamp(), - createMappings(), - createPolicy(configuration)); + if (datastreamCreated.compareAndSet(false, true)) { + createDatastream(client); + } }); } } + private void createDatastream(OpenSearchClient client) { + final IsmApi ismApi = new IsmApi(client, objectMapper); + int replicas = nodeService.allActive().size() == 1 ? 0 : 1; + + final DataStreamService dataStreamService = new DataStreamServiceImpl( + new DataStreamAdapterOS2(client, objectMapper, ismApi), + indexFieldTypesService, + replicas + ); + dataStreamService.createDataStream(configuration.getMetricsStream(), + configuration.getMetricsTimestamp(), + createMappings(), + createPolicy(configuration)); + } + private Map> createMappings() { Map> mappings = new HashMap<>(); mappings.put("node", ImmutableMap.of("type", "keyword"));