Revert "[AMQ-9392] Prevent InactivityMonitor read check Timer leak when TCP c…"

This reverts commit b6037c720b.
This commit is contained in:
JB Onofré
2025-11-04 20:46:11 +01:00
committed by GitHub
parent 9430b3505b
commit 67fee7bd34
2 changed files with 56 additions and 86 deletions

View File

@@ -437,17 +437,6 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
synchronized (AbstractInactivityMonitor.class) {
READ_CHECK_TIMER.purge();
CHECKER_COUNTER--;
if (CHECKER_COUNTER == 0) {
if (READ_CHECK_TIMER != null) {
READ_CHECK_TIMER.cancel();
READ_CHECK_TIMER = null;
}
try {
ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
} finally {
ASYNC_TASKS = null;
}
}
}
}
}
@@ -508,14 +497,10 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
READ_CHECK_TIMER.purge();
CHECKER_COUNTER--;
if (CHECKER_COUNTER == 0) {
if (WRITE_CHECK_TIMER != null) {
WRITE_CHECK_TIMER.cancel();
WRITE_CHECK_TIMER = null;
}
if (READ_CHECK_TIMER != null) {
READ_CHECK_TIMER.cancel();
READ_CHECK_TIMER = null;
}
WRITE_CHECK_TIMER.cancel();
READ_CHECK_TIMER.cancel();
WRITE_CHECK_TIMER = null;
READ_CHECK_TIMER = null;
try {
ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
} finally {

View File

@@ -16,18 +16,9 @@
*/
package org.apache.activemq.transport.tcp;
import static java.lang.Thread.getAllStackTraces;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsCollectionContaining.hasItem;
import static org.hamcrest.core.IsNot.not;
import java.io.IOException;
import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,7 +33,6 @@ import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer;
import org.hamcrest.Matcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +73,32 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
*/
private void startClient() throws Exception, URISyntaxException {
clientTransport = TransportFactory.connect(new URI("tcp://localhost:" + serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000"));
clientTransport.setTransportListener(new TestClientTransportListener());
clientTransport.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
clientReceiveCount.incrementAndGet();
if (clientRunOnCommand != null) {
clientRunOnCommand.run();
}
}
@Override
public void onException(IOException error) {
if (!ignoreClientError.get()) {
LOG.info("Client transport error:");
error.printStackTrace();
clientErrorCount.incrementAndGet();
}
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
});
clientTransport.start();
}
@@ -166,7 +181,32 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
// Manually create a client transport so that it does not send KeepAlive
// packets. this should simulate a client hang.
clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null);
clientTransport.setTransportListener(new TestClientTransportListener());
clientTransport.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
clientReceiveCount.incrementAndGet();
if (clientRunOnCommand != null) {
clientRunOnCommand.run();
}
}
@Override
public void onException(IOException error) {
if (!ignoreClientError.get()) {
LOG.info("Client transport error:");
error.printStackTrace();
clientErrorCount.incrementAndGet();
}
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
});
clientTransport.start();
WireFormatInfo info = new WireFormatInfo();
@@ -197,34 +237,6 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
assertEquals(0, serverErrorCount.get());
}
public void testReadCheckTimerIsNotLeakedOnError() throws Exception {
// Intentionally picks a port that is not the listening port to generate a failure
clientTransport = TransportFactory.connect(new URI("tcp://localhost:" + (serverPort ^ 1)));
clientTransport.setTransportListener(new TestClientTransportListener());
// Control test to verify there was no timer from a previous test
assertThat(getCurrentThreadNames(), not(hasReadCheckTimer()));
try {
clientTransport.start();
fail("A ConnectionException was expected");
} catch (SocketException e) {
// A SocketException is expected.
}
// If there is any read check timer at this point, calling stop should clean it up (because CHECKER_COUNTER becomes 0)
clientTransport.stop();
assertThat(getCurrentThreadNames(), not(hasReadCheckTimer()));
}
private static Matcher<Iterable<? super String>> hasReadCheckTimer() {
return hasItem("ActiveMQ InactivityMonitor ReadCheckTimer");
}
private static List<String> getCurrentThreadNames() {
return getAllStackTraces().keySet().stream().map(Thread::getName).collect(toList());
}
/**
* Used to test when a operation blocks. This should not cause transport to
* get disconnected.
@@ -260,31 +272,4 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
assertEquals(0, clientErrorCount.get());
assertEquals(0, serverErrorCount.get());
}
private class TestClientTransportListener implements TransportListener {
@Override
public void onCommand(Object command) {
clientReceiveCount.incrementAndGet();
if (clientRunOnCommand != null) {
clientRunOnCommand.run();
}
}
@Override
public void onException(IOException error) {
if (!ignoreClientError.get()) {
LOG.info("Client transport error:");
error.printStackTrace();
clientErrorCount.incrementAndGet();
}
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
}
}