diff --git a/activemq-http/pom.xml b/activemq-http/pom.xml index 0a9cc4eec3..2bae67a3f1 100644 --- a/activemq-http/pom.xml +++ b/activemq-http/pom.xml @@ -177,6 +177,8 @@ maven-surefire-plugin 1 + + 600 diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionExceptionTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionExceptionTest.java index 3806fe05fc..83054413e4 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionExceptionTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionExceptionTest.java @@ -168,6 +168,7 @@ public class JournalCorruptionExceptionTest { randomAccessFile.seek(offset); randomAccessFile.write(bla, 0, bla.length); randomAccessFile.getFD().sync(); + dataFile.closeRandomAccessFile(randomAccessFile); } private int getNumberOfJournalFiles() throws IOException { diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java index 0a4d4fcb5c..d69594ff35 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java @@ -34,6 +34,7 @@ import jakarta.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.IOHelper; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.store.kahadb.disk.journal.DataFile; import org.apache.activemq.store.kahadb.disk.journal.Journal; @@ -185,9 +186,9 @@ public class JournalCorruptionIndexRecoveryTest { } private void whackIndex(File dataDir) { - File indexToDelete = new File(dataDir, "db.data"); + final File indexToDelete = new File(dataDir, "db.data"); LOG.info("Whacking index: " + indexToDelete); - indexToDelete.delete(); + IOHelper.deleteFileNonBlocking(indexToDelete); } private void corruptBatchMiddle(int i) throws IOException { @@ -228,6 +229,7 @@ public class JournalCorruptionIndexRecoveryTest { Arrays.fill(bla, fill); randomAccessFile.seek(offset); randomAccessFile.write(bla, 0, bla.length); + dataFile.closeRandomAccessFile(randomAccessFile); } private int getNumberOfJournalFiles() throws IOException { diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java index 8a34b88944..6ec60213e1 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadb; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.IOHelper; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -306,9 +307,9 @@ public class JournalFdRecoveryTest { } private void whackFile(File dataDir, String name) throws Exception { - File indexToDelete = new File(dataDir, name); + final File indexToDelete = new File(dataDir, name); LOG.info("Whacking index: " + indexToDelete); - indexToDelete.delete(); + IOHelper.deleteFileNonBlocking(indexToDelete); } private int getNumberOfJournalFiles() throws IOException { diff --git a/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java b/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java index 3d24867ddb..e36afa66b4 100644 --- a/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java +++ b/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java @@ -35,9 +35,12 @@ import jakarta.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.apache.activemq.util.Wait; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; @@ -336,26 +339,27 @@ public class LoadBalanceTest { broker.stop(); } - // need to ensure broker bridge is alive before starting the consumer - // peeking at the internals will give us this info + // Wait until both brokers have their local consumer AND the remote demand subscription + // from the other broker's bridge (>= 2 consumers per queue). This guarantees: + // 1. Both local consumers (container1, container2) are truly subscribed + // 2. The network bridges are fully started and have propagated demand subscriptions private void waitForBridgeFormation() throws Exception { - long done = System.currentTimeMillis() + 30000; - while (done > System.currentTimeMillis()) { - if (hasBridge("one") && hasBridge("two")) { - return; - } - Thread.sleep(1000); - } + assertTrue("Both brokers should have local + bridge demand consumers for " + TESTING_QUEUE, + Wait.waitFor(() -> getQueueConsumerCount("one") >= 2 && getQueueConsumerCount("two") >= 2, + 30000, 100)); } - private boolean hasBridge(String name) { - boolean result = false; - BrokerService broker = BrokerRegistry.getInstance().lookup(name); - if (broker != null && !broker.getNetworkConnectors().isEmpty()) { - if (!broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) { - result = true; - } + private int getQueueConsumerCount(String brokerName) { + try { + final BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName); + if (broker == null) { + return 0; } - return result; + final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + final Destination dest = regionBroker.getDestinationMap().get(new ActiveMQQueue(TESTING_QUEUE)); + return dest != null ? dest.getConsumers().size() : 0; + } catch (Exception ignored) { + return 0; + } } } diff --git a/pom.xml b/pom.xml index 690f8a31f6..58a3aaf78a 100644 --- a/pom.xml +++ b/pom.xml @@ -35,6 +35,11 @@ + + 300 activemq-${project.version} Apache ActiveMQ @@ -973,6 +978,7 @@ true 1 false + ${surefire.forkedProcessTimeout} false true