From 98e1112afb279f174fddbc688b06b938533f7bda Mon Sep 17 00:00:00 2001 From: Tomas Dvorak Date: Tue, 17 Feb 2026 11:40:23 +0100 Subject: [PATCH] Shutdown executor service in opensearch removal (#24940) * Shutdown executor service in opensearch removal * Added changelog --- changelog/unreleased/pr-24940.toml | 5 +++++ .../datanode/opensearch/OpensearchProcessImpl.java | 10 ++++++---- .../datanode/opensearch/OpensearchProcessImplTest.java | 3 +-- 3 files changed, 12 insertions(+), 6 deletions(-) create mode 100644 changelog/unreleased/pr-24940.toml diff --git a/changelog/unreleased/pr-24940.toml b/changelog/unreleased/pr-24940.toml new file mode 100644 index 0000000000..c35dc9ffcf --- /dev/null +++ b/changelog/unreleased/pr-24940.toml @@ -0,0 +1,5 @@ +type = "fixed" +message = "Fix thread leak in datanode opensearch removal process" + +pulls = ["24940"] + diff --git a/data-node/src/main/java/org/graylog/datanode/opensearch/OpensearchProcessImpl.java b/data-node/src/main/java/org/graylog/datanode/opensearch/OpensearchProcessImpl.java index 7736631dd5..636797d201 100644 --- a/data-node/src/main/java/org/graylog/datanode/opensearch/OpensearchProcessImpl.java +++ b/data-node/src/main/java/org/graylog/datanode/opensearch/OpensearchProcessImpl.java @@ -114,7 +114,6 @@ public class OpensearchProcessImpl implements OpensearchProcess, ProcessListener static final String CLUSTER_ROUTING_ALLOCATION_EXCLUDE_SETTING = "cluster.routing.allocation.exclude._name"; boolean allocationExcludeChecked = false; - ScheduledExecutorService executorService; @Inject OpensearchProcessImpl(DatanodeConfiguration datanodeConfiguration, final CustomCAX509TrustManager trustManager, @@ -351,8 +350,10 @@ public class OpensearchProcessImpl implements OpensearchProcess, ProcessListener clusterClient.putSettings(settings, RequestOptions.DEFAULT); if (response.isAcknowledged()) { allocationExcludeChecked = false; // reset to rejoin cluster in case of failure - executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("datanode-removal").build()); - executorService.scheduleAtFixedRate(this::checkRemovalStatus, 10, 10, TimeUnit.SECONDS); + @SuppressWarnings("resource") + final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("datanode-removal").build()); + executorService.scheduleAtFixedRate(() -> checkRemovalStatus(executorService), 10, 10, TimeUnit.SECONDS); } else { throw new RuntimeException("Failed to exclude node from cluster allocation"); } @@ -365,7 +366,7 @@ public class OpensearchProcessImpl implements OpensearchProcess, ProcessListener /** * started by onRemove() to check if all shards have been relocated */ - void checkRemovalStatus() { + void checkRemovalStatus(ScheduledExecutorService executorService) { final Optional restClient = restClient(); if (restClient.isPresent()) { try { @@ -378,6 +379,7 @@ public class OpensearchProcessImpl implements OpensearchProcess, ProcessListener eventBus.post(DataNodeLifecycleEvent.create(nodeId.getNodeId(), DataNodeLifecycleTrigger.REMOVED)); } } catch (IOException | OpenSearchStatusException e) { + executorService.shutdown(); throw new RuntimeException("Error checking removal status", e); } } diff --git a/data-node/src/test/java/org/graylog/datanode/opensearch/OpensearchProcessImplTest.java b/data-node/src/test/java/org/graylog/datanode/opensearch/OpensearchProcessImplTest.java index f524b88124..e05a2bc190 100644 --- a/data-node/src/test/java/org/graylog/datanode/opensearch/OpensearchProcessImplTest.java +++ b/data-node/src/test/java/org/graylog/datanode/opensearch/OpensearchProcessImplTest.java @@ -138,8 +138,7 @@ public class OpensearchProcessImplTest { when(health.getRelocatingShards()).thenReturn(0); when(clusterClient.health(any(), any())).thenReturn(health); final ScheduledExecutorService executor = mock(ScheduledExecutorService.class); - opensearchProcess.executorService = executor; - opensearchProcess.checkRemovalStatus(); + opensearchProcess.checkRemovalStatus(executor); verify(processState).fire(OpensearchEvent.PROCESS_STOPPED); verify(executor).shutdown(); }