Fix datastreams init in datanode (#24671)

* Fix datastreams init in datanode

* Added changelog
This commit is contained in:
Tomas Dvorak
2026-01-13 08:16:59 +01:00
committed by GitHub
parent 050c1ae4e0
commit 83adbea30d
2 changed files with 30 additions and 13 deletions

View File

@@ -0,0 +1,4 @@
type = "f"
message = "Fix metrix datastream creation in situations where opensearch in datanode starts too slowly"
pulls = ["24671"]

View File

@@ -28,6 +28,7 @@ import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.periodicals.MetricsCollector;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;
import org.graylog.storage.opensearch2.DataStreamAdapterOS2;
import org.graylog.storage.opensearch2.OpenSearchClient;
import org.graylog.storage.opensearch2.ism.IsmApi;
import org.graylog2.cluster.nodes.DataNodeDto;
import org.graylog2.cluster.nodes.NodeService;
@@ -48,17 +49,19 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class ConfigureMetricsIndexSettings implements StateMachineTracer<OpensearchState, OpensearchEvent> {
private final Logger log = LoggerFactory.getLogger(ConfigureMetricsIndexSettings.class);
private final AtomicBoolean datastreamCreated = new AtomicBoolean(false);
private final OpensearchProcess process;
private final Configuration configuration;
private final IndexFieldTypesService indexFieldTypesService;
private final ObjectMapper objectMapper;
private DataStreamService dataStreamService;
private final NodeService<DataNodeDto> nodeService;
@Inject
@@ -76,23 +79,33 @@ public class ConfigureMetricsIndexSettings implements StateMachineTracer<Opensea
@Override
public void transition(OpensearchEvent trigger, OpensearchState source, OpensearchState destination) {
if (destination == OpensearchState.AVAILABLE && source == OpensearchState.STARTING && process.isManagerNode()) {
// we can't rely on the source->destination combination, because the process may become UNAVAILABLE before it
// finally starts (due to slow startup or limited resources) and then the source state is not starting,
// as originally assumed. Instead, let's remember if we have already created a datastream once.
if (destination == OpensearchState.AVAILABLE && process.isManagerNode()) {
process.openSearchClient().ifPresent(client -> {
final IsmApi ismApi = new IsmApi(client, objectMapper);
int replicas = nodeService.allActive().size() == 1 ? 0 : 1;
dataStreamService = new DataStreamServiceImpl(
new DataStreamAdapterOS2(client, objectMapper, ismApi),
indexFieldTypesService,
replicas
);
dataStreamService.createDataStream(configuration.getMetricsStream(),
configuration.getMetricsTimestamp(),
createMappings(),
createPolicy(configuration));
if (datastreamCreated.compareAndSet(false, true)) {
createDatastream(client);
}
});
}
}
private void createDatastream(OpenSearchClient client) {
final IsmApi ismApi = new IsmApi(client, objectMapper);
int replicas = nodeService.allActive().size() == 1 ? 0 : 1;
final DataStreamService dataStreamService = new DataStreamServiceImpl(
new DataStreamAdapterOS2(client, objectMapper, ismApi),
indexFieldTypesService,
replicas
);
dataStreamService.createDataStream(configuration.getMetricsStream(),
configuration.getMetricsTimestamp(),
createMappings(),
createPolicy(configuration));
}
private Map<String, Map<String, String>> createMappings() {
Map<String, Map<String, String>> mappings = new HashMap<>();
mappings.put("node", ImmutableMap.of("type", "keyword"));