diff --git a/handler/src/test/java/io/netty/handler/ssl/SniClientTest.java b/handler/src/test/java/io/netty/handler/ssl/SniClientTest.java index 3193d200b6..ca1c9b8bf3 100644 --- a/handler/src/test/java/io/netty/handler/ssl/SniClientTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/SniClientTest.java @@ -34,7 +34,7 @@ import org.junit.Assert; import org.junit.Assume; import org.junit.Test; -import java.nio.channels.ClosedChannelException; +import javax.net.ssl.SSLException; public class SniClientTest { @@ -67,7 +67,7 @@ public class SniClientTest { SniClientJava8TestUtil.testSniClient(SslProvider.JDK, SslProvider.JDK, true); } - @Test(timeout = 30000, expected = ClosedChannelException.class) + @Test(timeout = 30000, expected = SSLException.class) public void testSniSNIMatcherDoesNotMatchClientJdkSslServerJdkSsl() throws Exception { Assume.assumeTrue(PlatformDependent.javaVersion() >= 8); SniClientJava8TestUtil.testSniClient(SslProvider.JDK, SslProvider.JDK, false); @@ -80,7 +80,7 @@ public class SniClientTest { SniClientJava8TestUtil.testSniClient(SslProvider.OPENSSL, SslProvider.OPENSSL, true); } - @Test(timeout = 30000, expected = ClosedChannelException.class) + @Test(timeout = 30000, expected = SSLException.class) public void testSniSNIMatcherDoesNotMatchClientOpenSslServerOpenSsl() throws Exception { Assume.assumeTrue(PlatformDependent.javaVersion() >= 8); Assume.assumeTrue(OpenSsl.isAvailable()); @@ -94,7 +94,7 @@ public class SniClientTest { SniClientJava8TestUtil.testSniClient(SslProvider.JDK, SslProvider.OPENSSL, true); } - @Test(timeout = 30000, expected = ClosedChannelException.class) + @Test(timeout = 30000, expected = SSLException.class) public void testSniSNIMatcherDoesNotMatchClientJdkSslServerOpenSsl() throws Exception { Assume.assumeTrue(PlatformDependent.javaVersion() >= 8); Assume.assumeTrue(OpenSsl.isAvailable()); @@ -108,7 +108,7 @@ public class SniClientTest { SniClientJava8TestUtil.testSniClient(SslProvider.OPENSSL, SslProvider.JDK, true); } - @Test(timeout = 30000, expected = ClosedChannelException.class) + @Test(timeout = 30000, expected = SSLException.class) public void testSniSNIMatcherDoesNotMatchClientOpenSslServerJdkSsl() throws Exception { Assume.assumeTrue(PlatformDependent.javaVersion() >= 8); Assume.assumeTrue(OpenSsl.isAvailable()); diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 39cf1a7588..d18fa3a4a1 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -91,7 +91,6 @@ public class LocalChannel extends AbstractChannel { private volatile LocalAddress remoteAddress; private volatile ChannelPromise connectPromise; private volatile boolean readInProgress; - private volatile boolean registerInProgress; private volatile boolean writeInProgress; private volatile Future finishReadFuture; @@ -172,13 +171,8 @@ public class LocalChannel extends AbstractChannel { // See https://github.com/netty/netty/issues/2400 if (peer != null && parent() != null) { // Store the peer in a local variable as it may be set to null if doClose() is called. - // Because of this we also set registerInProgress to true as we check for this in doClose() and make sure - // we delay the fireChannelInactive() to be fired after the fireChannelActive() and so keep the correct - // order of events. - // // See https://github.com/netty/netty/issues/2144 final LocalChannel peer = this.peer; - registerInProgress = true; state = State.CONNECTED; peer.remoteAddress = parent() == null ? null : parent().localAddress(); @@ -191,7 +185,6 @@ public class LocalChannel extends AbstractChannel { peer.eventLoop().execute(new Runnable() { @Override public void run() { - registerInProgress = false; ChannelPromise promise = peer.connectPromise; // Only trigger fireChannelActive() if the promise was not null and was not completed yet. @@ -237,7 +230,9 @@ public class LocalChannel extends AbstractChannel { state = State.CLOSED; // Preserve order of event and force a read operation now before the close operation is processed. - finishPeerRead(this); + if (writeInProgress && peer != null) { + finishPeerRead(peer); + } ChannelPromise promise = connectPromise; if (promise != null) { @@ -249,34 +244,29 @@ public class LocalChannel extends AbstractChannel { if (peer != null) { this.peer = null; - // Need to execute the close in the correct EventLoop (see https://github.com/netty/netty/issues/1777). - // Also check if the registration was not done yet. In this case we submit the close to the EventLoop - // to make sure its run after the registration completes - // (see https://github.com/netty/netty/issues/2144). + // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true. + // This ensures that if both channels are on the same event loop, the peer's channelInActive + // event is triggered *after* this peer's channelInActive event EventLoop peerEventLoop = peer.eventLoop(); final boolean peerIsActive = peer.isActive(); - if (peerEventLoop.inEventLoop() && !registerInProgress) { - peer.tryClose(peerIsActive); - } else { - try { - peerEventLoop.execute(new Runnable() { - @Override - public void run() { - peer.tryClose(peerIsActive); - } - }); - } catch (Throwable cause) { - logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!", - this, peer, cause); - if (peerEventLoop.inEventLoop()) { - peer.releaseInboundBuffers(); - } else { - // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or - // rejects the close Runnable but give a best effort. - peer.close(); + try { + peerEventLoop.execute(new Runnable() { + @Override + public void run() { + peer.tryClose(peerIsActive); } - PlatformDependent.throwException(cause); + }); + } catch (Throwable cause) { + logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!", + this, peer, cause); + if (peerEventLoop.inEventLoop()) { + peer.releaseInboundBuffers(); + } else { + // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or + // rejects the close Runnable but give a best effort. + peer.close(); } + PlatformDependent.throwException(cause); } } } finally { diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index 6d4b4686ff..a8def572b8 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -390,6 +390,73 @@ public class LocalChannelTest { } } + @Test + public void testCloseAfterWriteInSameEventLoopPreservesOrder() throws InterruptedException { + Bootstrap cb = new Bootstrap(); + ServerBootstrap sb = new ServerBootstrap(); + final CountDownLatch messageLatch = new CountDownLatch(3); + final ByteBuf data = Unpooled.wrappedBuffer(new byte[1024]); + + try { + cb.group(sharedGroup) + .channel(LocalChannel.class) + .handler(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.writeAndFlush(data.retainedDuplicate()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (data.equals(msg)) { + ReferenceCountUtil.safeRelease(msg); + messageLatch.countDown(); + } else { + super.channelRead(ctx, msg); + } + } + }); + + sb.group(sharedGroup) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (data.equals(msg)) { + messageLatch.countDown(); + ctx.writeAndFlush(data); + ctx.close(); + } else { + super.channelRead(ctx, msg); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + messageLatch.countDown(); + super.channelInactive(ctx); + } + }); + + Channel sc = null; + Channel cc = null; + try { + // Start server + sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel(); + + // Connect to the server + cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel(); + assertTrue(messageLatch.await(5, SECONDS)); + assertFalse(cc.isOpen()); + } finally { + closeChannel(cc); + closeChannel(sc); + } + } finally { + data.release(); + } + } + @Test public void testWriteInWritePromiseCompletePreservesOrder() throws InterruptedException { Bootstrap cb = new Bootstrap(); @@ -620,85 +687,6 @@ public class LocalChannelTest { } } - @Test - public void testClosePeerInWritePromiseCompleteSameEventLoopPreservesOrder() throws InterruptedException { - Bootstrap cb = new Bootstrap(); - ServerBootstrap sb = new ServerBootstrap(); - final CountDownLatch messageLatch = new CountDownLatch(2); - final CountDownLatch serverChannelLatch = new CountDownLatch(1); - final ByteBuf data = Unpooled.wrappedBuffer(new byte[1024]); - final AtomicReference serverChannelRef = new AtomicReference(); - - try { - cb.group(sharedGroup) - .channel(LocalChannel.class) - .handler(new TestHandler()); - - sb.group(sharedGroup) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg.equals(data)) { - ReferenceCountUtil.safeRelease(msg); - messageLatch.countDown(); - } else { - super.channelRead(ctx, msg); - } - } - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - messageLatch.countDown(); - super.channelInactive(ctx); - } - }); - serverChannelRef.set(ch); - serverChannelLatch.countDown(); - } - }); - - Channel sc = null; - Channel cc = null; - try { - // Start server - sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel(); - - // Connect to the server - cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel(); - - assertTrue(serverChannelLatch.await(5, SECONDS)); - - final Channel ccCpy = cc; - // Make sure a write operation is executed in the eventloop - cc.pipeline().lastContext().executor().execute(new Runnable() { - @Override - public void run() { - ChannelPromise promise = ccCpy.newPromise(); - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - serverChannelRef.get().close(); - } - }); - ccCpy.writeAndFlush(data.retainedDuplicate(), promise); - } - }); - - assertTrue(messageLatch.await(5, SECONDS)); - assertFalse(cc.isOpen()); - assertFalse(serverChannelRef.get().isOpen()); - } finally { - closeChannel(cc); - closeChannel(sc); - } - } finally { - data.release(); - } - } - @Test public void testWriteWhilePeerIsClosedReleaseObjectAndFailPromise() throws InterruptedException { Bootstrap cb = new Bootstrap();