Shutdown executor service in opensearch removal (#24940)

* Shutdown executor service in opensearch removal

* Added changelog
This commit is contained in:
Tomas Dvorak
2026-02-17 11:40:23 +01:00
committed by GitHub
parent 9b389864c3
commit 98e1112afb
3 changed files with 12 additions and 6 deletions

View File

@@ -0,0 +1,5 @@
type = "fixed"
message = "Fix thread leak in datanode opensearch removal process"
pulls = ["24940"]

View File

@@ -114,7 +114,6 @@ public class OpensearchProcessImpl implements OpensearchProcess, ProcessListener
static final String CLUSTER_ROUTING_ALLOCATION_EXCLUDE_SETTING = "cluster.routing.allocation.exclude._name";
boolean allocationExcludeChecked = false;
ScheduledExecutorService executorService;
@Inject
OpensearchProcessImpl(DatanodeConfiguration datanodeConfiguration, final CustomCAX509TrustManager trustManager,
@@ -351,8 +350,10 @@ public class OpensearchProcessImpl implements OpensearchProcess, ProcessListener
clusterClient.putSettings(settings, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
allocationExcludeChecked = false; // reset to rejoin cluster in case of failure
executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("datanode-removal").build());
executorService.scheduleAtFixedRate(this::checkRemovalStatus, 10, 10, TimeUnit.SECONDS);
@SuppressWarnings("resource")
final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("datanode-removal").build());
executorService.scheduleAtFixedRate(() -> checkRemovalStatus(executorService), 10, 10, TimeUnit.SECONDS);
} else {
throw new RuntimeException("Failed to exclude node from cluster allocation");
}
@@ -365,7 +366,7 @@ public class OpensearchProcessImpl implements OpensearchProcess, ProcessListener
/**
* started by onRemove() to check if all shards have been relocated
*/
void checkRemovalStatus() {
void checkRemovalStatus(ScheduledExecutorService executorService) {
final Optional<RestHighLevelClient> restClient = restClient();
if (restClient.isPresent()) {
try {
@@ -378,6 +379,7 @@ public class OpensearchProcessImpl implements OpensearchProcess, ProcessListener
eventBus.post(DataNodeLifecycleEvent.create(nodeId.getNodeId(), DataNodeLifecycleTrigger.REMOVED));
}
} catch (IOException | OpenSearchStatusException e) {
executorService.shutdown();
throw new RuntimeException("Error checking removal status", e);
}
}

View File

@@ -138,8 +138,7 @@ public class OpensearchProcessImplTest {
when(health.getRelocatingShards()).thenReturn(0);
when(clusterClient.health(any(), any())).thenReturn(health);
final ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
opensearchProcess.executorService = executor;
opensearchProcess.checkRemovalStatus();
opensearchProcess.checkRemovalStatus(executor);
verify(processState).fire(OpensearchEvent.PROCESS_STOPPED);
verify(executor).shutdown();
}