[AMQ-8525] run mqtt parallel-friendly tests into separate forks in parallel (#1541)

This commit is contained in:
Jean-Louis Monteiro
2026-03-03 10:01:42 +01:00
committed by GitHub
parent f7993bcc6b
commit e4d0149de4
28 changed files with 151 additions and 13 deletions

1
Jenkinsfile vendored
View File

@@ -130,6 +130,7 @@ pipeline {
}
when { expression { return params.testsEnabled } }
steps {
echo 'Running tests'
sh 'java -version'
sh 'mvn -version'

View File

@@ -28,6 +28,7 @@
<artifactId>activemq-mqtt</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ :: MQTT Protocol</name>
<description>The ActiveMQ MQTT Protocol Implementation</description>
<dependencies>
@@ -215,17 +216,54 @@
<forkCount>1</forkCount>
<argLine>-javaagent:${org.mockito:mockito-core:jar}</argLine>
<runOrder>alphabetical</runOrder>
<systemPropertyValues>
<org.apache.activemq.default.directory.prefix>target</org.apache.activemq.default.directory.prefix>
</systemPropertyValues>
<!-- includes>
<include>**/*Test.*</include>
</includes -->
<excludes>
<exclude>**/PahoMQTNioTTest.java</exclude>
</excludes>
<reportFormat>plain</reportFormat>
<excludedGroups>org.apache.activemq.transport.mqtt.ParallelTest</excludedGroups>
<systemPropertyVariables>
<java.net.preferIPv4Stack>true</java.net.preferIPv4Stack>
<org.apache.activemq.AutoFailTestSupport.disableSystemExit>true</org.apache.activemq.AutoFailTestSupport.disableSystemExit>
<org.apache.activemq.broker.jmx.createConnector>false</org.apache.activemq.broker.jmx.createConnector>
<org.apache.activemq.default.directory.prefix>${project.build.directory}/</org.apache.activemq.default.directory.prefix>
</systemPropertyVariables>
<consoleOutputReporter>
<disable>true</disable>
</consoleOutputReporter>
<statelessTestsetInfoReporter
implementation="org.apache.maven.plugin.surefire.extensions.junit5.JUnit5StatelessTestsetInfoTreeReporter">
<printStacktraceOnError>true</printStacktraceOnError>
<printStacktraceOnFailure>true</printStacktraceOnFailure>
<printStdoutOnError>true</printStdoutOnError>
<printStdoutOnFailure>true</printStdoutOnFailure>
<printStderrOnError>true</printStderrOnError>
<printStderrOnFailure>true</printStderrOnFailure>
</statelessTestsetInfoReporter>
<excludes>
<exclude>**/PahoMQTNioTTest.java</exclude>
</excludes>
</configuration>
<executions>
<execution>
<id>parallel</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<!-- drop the default excludedGroups -->
<excludedGroups combine.self="override"/>
<groups>org.apache.activemq.transport.mqtt.ParallelTest</groups>
<forkCount>2C</forkCount>
<reuseForks>false</reuseForks>
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
<systemPropertyVariables>
<org.apache.activemq.default.directory.prefix>${project.build.directory}/parallel-tests-${surefire.forkNumber}/</org.apache.activemq.default.directory.prefix>
<!-- when running MQTT tests in parallel in the CI (quite slow) we need to bump the wireformat negotiation timeout (5s by default) -->
<org.apache.activemq.transport.wireFormatNegotiationTimeout>20000</org.apache.activemq.transport.wireFormatNegotiationTimeout>
</systemPropertyVariables>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.activemq.protobuf</groupId>
<artifactId>activemq-protobuf</artifactId>

View File

@@ -47,15 +47,16 @@ import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests various use cases that require authentication or authorization over MQTT
*/
@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTAuthTest extends MQTTAuthTestSupport {

View File

@@ -41,12 +41,14 @@ import org.fusesource.mqtt.codec.SUBSCRIBE;
import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests the functionality of the MQTTCodec class.
*/
@Category(ParallelTest.class)
public class MQTTCodecTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTCodecTest.class);

View File

@@ -42,9 +42,12 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.ByteSequence;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
*
*/
@Category(ParallelTest.class)
public class MQTTCompositeQueueRetainedTest extends MQTTTestSupport {
// configure composite topic

View File

@@ -35,6 +35,7 @@ import org.fusesource.mqtt.client.MQTT;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
* Test that connection attempts that don't send a CONNECT frame will
* get cleaned up by the inactivity monitor.
*/
@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTConnectTest extends MQTTTestSupport {

View File

@@ -31,9 +31,11 @@ import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.experimental.categories.Category;
/**
* Test that the maxFrameSize configuration value is applied across the transports.
*/
@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTMaxFrameSizeTest extends MQTTTestSupport {

View File

@@ -16,9 +16,12 @@
*/
package org.apache.activemq.transport.mqtt;
import org.junit.experimental.categories.Category;
/**
* Run the basic tests with the NIO Transport.
*/
@Category(ParallelTest.class)
public class MQTTNIOSSLTest extends MQTTTest {
@Override

View File

@@ -16,9 +16,12 @@
*/
package org.apache.activemq.transport.mqtt;
import org.junit.experimental.categories.Category;
/**
* Run the basic tests with the NIO Transport.
*/
@Category(ParallelTest.class)
public class MQTTNIOTest extends MQTTTest {
@Override

View File

@@ -29,7 +29,9 @@ import org.fusesource.mqtt.client.Topic;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(ParallelTest.class)
public class MQTTOverlapedSubscriptionsTest {
private BrokerService brokerService;

View File

@@ -52,10 +52,12 @@ import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.experimental.categories.Category;
/**
* Test to show that a PINGRESP will only be sent for a PINGREQ
* packet after a CONNECT packet has been received.
*/
@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTPingReqTest extends MQTTTestSupport {

View File

@@ -37,12 +37,14 @@ import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
/**
* Tests for various usage scenarios of the protocol converter
*/
@Category(ParallelTest.class)
public class MQTTProtocolConverterTest {
private MQTTTransport transport;

View File

@@ -16,9 +16,12 @@
*/
package org.apache.activemq.transport.mqtt;
import org.junit.experimental.categories.Category;
/**
* Run the basic tests with the NIO Transport.
*/
@Category(ParallelTest.class)
public class MQTTSSLTest extends MQTTTest {
@Override

View File

@@ -34,9 +34,11 @@ import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.experimental.categories.Category;
/**
* Test that all previous QoS 2 subscriptions are recovered on Broker restart.
*/
@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTSubscriptionRecoveryTest extends MQTTTestSupport {

View File

@@ -66,9 +66,13 @@ import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PUBLISH;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category(ParallelTest.class)
public class MQTTTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);

View File

@@ -39,6 +39,7 @@ import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.transport.mqtt.util.ResourceLoadingSslContext;
import org.apache.activemq.util.IOHelper;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
@@ -53,8 +54,6 @@ public class MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);
public static final String KAHADB_DIRECTORY = "target/activemq-data/";
protected BrokerService brokerService;
protected int port;
protected String jmsUri = "vm://localhost";
@@ -143,7 +142,7 @@ public class MQTTTestSupport {
brokerService.setPersistent(isPersistent());
if (isPersistent()) {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName()));
kaha.setDirectory(new File(IOHelper.getDefaultDataDirectory() + "/" + getTestName()));
brokerService.setPersistenceAdapter(kaha);
}
brokerService.setAdvisorySupport(advisorySupport);

View File

@@ -42,6 +42,7 @@ import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* Run the basic tests with the NIO Transport.
*/
@Category(ParallelTest.class)
public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTVirtualTopicSubscriptionsTest.class);

View File

@@ -18,12 +18,14 @@ package org.apache.activemq.transport.mqtt;
import org.fusesource.mqtt.client.*;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@Category(ParallelTest.class)
public class MQTTWillTest extends MQTTTestSupport {
@Test(timeout = 60 * 1000)

View File

@@ -16,9 +16,12 @@
*/
package org.apache.activemq.transport.mqtt;
import org.junit.experimental.categories.Category;
/**
* Test the NIO transport with this Test group
*/
@Category(ParallelTest.class)
public class PahoMQTTNIOTest extends PahoMQTTTest {
@Override

View File

@@ -47,6 +47,9 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.experimental.categories.Category;
@Category(ParallelTest.class)
public class PahoMQTTTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);

View File

@@ -22,12 +22,14 @@ import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import static org.junit.Assert.assertEquals;
@Category(ParallelTest.class)
public class PahoVirtualTopicMQTTTest extends PahoMQTTTest {
@Override

View File

@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
/**
* Marker interface used with {@code @Category(ParallelTest.class)} to opt a
* test class or method into the {@code all-parallel} Maven profile. Only tests
* explicitly tagged with this category execute when the profile is enabled,
* which allows a gradual migration toward full parallelism.
*/
public interface ParallelTest {
}

View File

@@ -17,10 +17,13 @@
package org.apache.activemq.transport.mqtt.auto;
import org.apache.activemq.transport.mqtt.MQTTTest;
import org.apache.activemq.transport.mqtt.ParallelTest;
import org.junit.experimental.categories.Category;
/**
* Run the basic tests with the NIO Transport.
*/
@Category(ParallelTest.class)
public class MQTTAutoNioSslTest extends MQTTTest {
@Override

View File

@@ -17,10 +17,13 @@
package org.apache.activemq.transport.mqtt.auto;
import org.apache.activemq.transport.mqtt.MQTTTest;
import org.apache.activemq.transport.mqtt.ParallelTest;
import org.junit.experimental.categories.Category;
/**
* Run the basic tests with the NIO Transport.
*/
@Category(ParallelTest.class)
public class MQTTAutoNioTest extends MQTTTest {
@Override

View File

@@ -36,6 +36,10 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.experimental.categories.Category;
import org.apache.activemq.transport.mqtt.ParallelTest;
@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTAutoSslAuthTest extends MQTTTestSupport {

View File

@@ -17,10 +17,13 @@
package org.apache.activemq.transport.mqtt.auto;
import org.apache.activemq.transport.mqtt.MQTTTest;
import org.apache.activemq.transport.mqtt.ParallelTest;
import org.junit.experimental.categories.Category;
/**
* Run the basic tests with the NIO Transport.
*/
@Category(ParallelTest.class)
public class MQTTAutoSslTest extends MQTTTest {
@Override

View File

@@ -17,10 +17,13 @@
package org.apache.activemq.transport.mqtt.auto;
import org.apache.activemq.transport.mqtt.MQTTTest;
import org.apache.activemq.transport.mqtt.ParallelTest;
import org.junit.experimental.categories.Category;
/**
* Run the basic tests with the NIO Transport.
*/
@Category(ParallelTest.class)
public class MQTTAutoTest extends MQTTTest {
@Override

14
pom.xml
View File

@@ -137,6 +137,7 @@
<maven-graph-plugin-version>1.45</maven-graph-plugin-version>
<maven-core-version>3.9.12</maven-core-version>
<maven-surefire-plugin-version>3.5.3</maven-surefire-plugin-version>
<maven-surefire-junit5-tree-reporter-version>1.5.1</maven-surefire-junit5-tree-reporter-version>
<!-- OSGi bundles properties -->
<activemq.osgi.import.pkg>*</activemq.osgi.import.pkg>
<activemq.osgi.export.pkg>org.apache.activemq*</activemq.osgi.export.pkg>
@@ -974,6 +975,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin-version}</version>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<forkCount>1</forkCount>
@@ -992,6 +994,18 @@
<exclude>**/load/*</exclude>
</excludes>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${maven-surefire-plugin-version}</version>
</dependency>
<dependency>
<groupId>me.fabriciorby</groupId>
<artifactId>maven-surefire-junit5-tree-reporter</artifactId>
<version>${maven-surefire-junit5-tree-reporter-version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>