mirror of
https://github.com/apache/activemq.git
synced 2026-03-13 08:02:28 +08:00
fix(kahadb-store): Fix Windows CI hanging by adding timeout for forke… (#1708)
* fix(kahadb-store): Fix Windows CI hanging by adding timeout for forked processes and ensuring proper resource cleanup * fix(http): Increase forked process timeout for slower HTTP/WS/SSL tests * Refactor LoadBalanceTest to improve bridge formation wait logic and add consumer count check
This commit is contained in:
committed by
GitHub
parent
50119e0147
commit
c4255bddc3
@@ -177,6 +177,8 @@
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<forkCount>1</forkCount>
|
||||
<!-- HTTP/WS/SSL tests are inherently slower: tests have legitimate Thread.sleep() calls up to 35s -->
|
||||
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
|
||||
@@ -168,6 +168,7 @@ public class JournalCorruptionExceptionTest {
|
||||
randomAccessFile.seek(offset);
|
||||
randomAccessFile.write(bla, 0, bla.length);
|
||||
randomAccessFile.getFD().sync();
|
||||
dataFile.closeRandomAccessFile(randomAccessFile);
|
||||
}
|
||||
|
||||
private int getNumberOfJournalFiles() throws IOException {
|
||||
|
||||
@@ -34,6 +34,7 @@ import jakarta.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
|
||||
import org.apache.activemq.store.kahadb.disk.journal.Journal;
|
||||
@@ -185,9 +186,9 @@ public class JournalCorruptionIndexRecoveryTest {
|
||||
}
|
||||
|
||||
private void whackIndex(File dataDir) {
|
||||
File indexToDelete = new File(dataDir, "db.data");
|
||||
final File indexToDelete = new File(dataDir, "db.data");
|
||||
LOG.info("Whacking index: " + indexToDelete);
|
||||
indexToDelete.delete();
|
||||
IOHelper.deleteFileNonBlocking(indexToDelete);
|
||||
}
|
||||
|
||||
private void corruptBatchMiddle(int i) throws IOException {
|
||||
@@ -228,6 +229,7 @@ public class JournalCorruptionIndexRecoveryTest {
|
||||
Arrays.fill(bla, fill);
|
||||
randomAccessFile.seek(offset);
|
||||
randomAccessFile.write(bla, 0, bla.length);
|
||||
dataFile.closeRandomAccessFile(randomAccessFile);
|
||||
}
|
||||
|
||||
private int getNumberOfJournalFiles() throws IOException {
|
||||
|
||||
@@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadb;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
@@ -306,9 +307,9 @@ public class JournalFdRecoveryTest {
|
||||
}
|
||||
|
||||
private void whackFile(File dataDir, String name) throws Exception {
|
||||
File indexToDelete = new File(dataDir, name);
|
||||
final File indexToDelete = new File(dataDir, name);
|
||||
LOG.info("Whacking index: " + indexToDelete);
|
||||
indexToDelete.delete();
|
||||
IOHelper.deleteFileNonBlocking(indexToDelete);
|
||||
}
|
||||
|
||||
private int getNumberOfJournalFiles() throws IOException {
|
||||
|
||||
@@ -35,9 +35,12 @@ import jakarta.jms.TextMessage;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerRegistry;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.network.NetworkConnector;
|
||||
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
@@ -336,26 +339,27 @@ public class LoadBalanceTest {
|
||||
broker.stop();
|
||||
}
|
||||
|
||||
// need to ensure broker bridge is alive before starting the consumer
|
||||
// peeking at the internals will give us this info
|
||||
// Wait until both brokers have their local consumer AND the remote demand subscription
|
||||
// from the other broker's bridge (>= 2 consumers per queue). This guarantees:
|
||||
// 1. Both local consumers (container1, container2) are truly subscribed
|
||||
// 2. The network bridges are fully started and have propagated demand subscriptions
|
||||
private void waitForBridgeFormation() throws Exception {
|
||||
long done = System.currentTimeMillis() + 30000;
|
||||
while (done > System.currentTimeMillis()) {
|
||||
if (hasBridge("one") && hasBridge("two")) {
|
||||
return;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
assertTrue("Both brokers should have local + bridge demand consumers for " + TESTING_QUEUE,
|
||||
Wait.waitFor(() -> getQueueConsumerCount("one") >= 2 && getQueueConsumerCount("two") >= 2,
|
||||
30000, 100));
|
||||
}
|
||||
|
||||
private boolean hasBridge(String name) {
|
||||
boolean result = false;
|
||||
BrokerService broker = BrokerRegistry.getInstance().lookup(name);
|
||||
if (broker != null && !broker.getNetworkConnectors().isEmpty()) {
|
||||
if (!broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
|
||||
result = true;
|
||||
}
|
||||
private int getQueueConsumerCount(String brokerName) {
|
||||
try {
|
||||
final BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName);
|
||||
if (broker == null) {
|
||||
return 0;
|
||||
}
|
||||
return result;
|
||||
final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
|
||||
final Destination dest = regionBroker.getDestinationMap().get(new ActiveMQQueue(TESTING_QUEUE));
|
||||
return dest != null ? dest.getConsumers().size() : 0;
|
||||
} catch (Exception ignored) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
6
pom.xml
6
pom.xml
@@ -35,6 +35,11 @@
|
||||
|
||||
<surefire.argLine></surefire.argLine>
|
||||
<maven.surefire.allow.securitymanager></maven.surefire.allow.securitymanager>
|
||||
<!-- Per-class JVM timeout (reuseForks=false means one JVM per test class).
|
||||
Kills a forked JVM that has been running longer than this many seconds,
|
||||
preventing CI from hanging indefinitely on Windows file-lock issues or
|
||||
any other test that never terminates. Override per-module if needed. -->
|
||||
<surefire.forkedProcessTimeout>300</surefire.forkedProcessTimeout>
|
||||
|
||||
<siteId>activemq-${project.version}</siteId>
|
||||
<projectName>Apache ActiveMQ</projectName>
|
||||
@@ -973,6 +978,7 @@
|
||||
<redirectTestOutputToFile>true</redirectTestOutputToFile>
|
||||
<forkCount>1</forkCount>
|
||||
<reuseForks>false</reuseForks>
|
||||
<forkedProcessTimeoutInSeconds>${surefire.forkedProcessTimeout}</forkedProcessTimeoutInSeconds>
|
||||
<failIfNoTests>false</failIfNoTests>
|
||||
<systemPropertyVariables>
|
||||
<java.awt.headless>true</java.awt.headless>
|
||||
|
||||
Reference in New Issue
Block a user