From 805ac002e679aff86d86b7001a7b958c98670704 Mon Sep 17 00:00:00 2001 From: louxiu Date: Fri, 8 Dec 2017 09:05:57 +0800 Subject: [PATCH] FIX: force a read operation for peer instead of self (#7454) * FIX: force a read operation for peer instead of self Motivation: When A is in `writeInProgress` and call self close, A should `finishPeerRead` for B(A' peer). Modifications: Call `finishPeerRead` with peer in `LocalChannel#doClose` Result: Clear confuse of code logic * FIX: preserves order of close after write in same event loop Motivation: If client and server(client's peer channel) are in same event loop, client writes data to server in `ChannelActive`. Server receives the data and write it back. The client's read can't be triggered becasue client's `ChannelActive` is not finished at this point and its `readInProgress` is false. Then server closes itself, it will also close the client's channel. And client has no chance to receive the data. Modifications: 1. Add a test case to demonstrate the problem 2. When `doClose` peer, we always call `peer.eventLoop().execute()` and `registerInProgress` is not needed. 3. Remove test case `testClosePeerInWritePromiseCompleteSameEventLoopPreservesOrder`. This test case can't pass becasue of this commit. IMHO, I think it is OK, becasue it is reasonable that the client flushes the data to socket, then server close the channel without received the data. 4. For mismatch test in SniClientTest, the client should receive server's alert before closed(caused by server's close) Result: The problem is gone. --- .../io/netty/handler/ssl/SniClientTest.java | 10 +- .../io/netty/channel/local/LocalChannel.java | 54 +++---- .../netty/channel/local/LocalChannelTest.java | 146 ++++++++---------- 3 files changed, 94 insertions(+), 116 deletions(-) diff --git a/handler/src/test/java/io/netty/handler/ssl/SniClientTest.java b/handler/src/test/java/io/netty/handler/ssl/SniClientTest.java index 3193d200b6..ca1c9b8bf3 100644 --- a/handler/src/test/java/io/netty/handler/ssl/SniClientTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/SniClientTest.java @@ -34,7 +34,7 @@ import org.junit.Assert; import org.junit.Assume; import org.junit.Test; -import java.nio.channels.ClosedChannelException; +import javax.net.ssl.SSLException; public class SniClientTest { @@ -67,7 +67,7 @@ public class SniClientTest { SniClientJava8TestUtil.testSniClient(SslProvider.JDK, SslProvider.JDK, true); } - @Test(timeout = 30000, expected = ClosedChannelException.class) + @Test(timeout = 30000, expected = SSLException.class) public void testSniSNIMatcherDoesNotMatchClientJdkSslServerJdkSsl() throws Exception { Assume.assumeTrue(PlatformDependent.javaVersion() >= 8); SniClientJava8TestUtil.testSniClient(SslProvider.JDK, SslProvider.JDK, false); @@ -80,7 +80,7 @@ public class SniClientTest { SniClientJava8TestUtil.testSniClient(SslProvider.OPENSSL, SslProvider.OPENSSL, true); } - @Test(timeout = 30000, expected = ClosedChannelException.class) + @Test(timeout = 30000, expected = SSLException.class) public void testSniSNIMatcherDoesNotMatchClientOpenSslServerOpenSsl() throws Exception { Assume.assumeTrue(PlatformDependent.javaVersion() >= 8); Assume.assumeTrue(OpenSsl.isAvailable()); @@ -94,7 +94,7 @@ public class SniClientTest { SniClientJava8TestUtil.testSniClient(SslProvider.JDK, SslProvider.OPENSSL, true); } - @Test(timeout = 30000, expected = ClosedChannelException.class) + @Test(timeout = 30000, expected = SSLException.class) public void testSniSNIMatcherDoesNotMatchClientJdkSslServerOpenSsl() throws Exception { Assume.assumeTrue(PlatformDependent.javaVersion() >= 8); Assume.assumeTrue(OpenSsl.isAvailable()); @@ -108,7 +108,7 @@ public class SniClientTest { SniClientJava8TestUtil.testSniClient(SslProvider.OPENSSL, SslProvider.JDK, true); } - @Test(timeout = 30000, expected = ClosedChannelException.class) + @Test(timeout = 30000, expected = SSLException.class) public void testSniSNIMatcherDoesNotMatchClientOpenSslServerJdkSsl() throws Exception { Assume.assumeTrue(PlatformDependent.javaVersion() >= 8); Assume.assumeTrue(OpenSsl.isAvailable()); diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 39cf1a7588..d18fa3a4a1 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -91,7 +91,6 @@ public class LocalChannel extends AbstractChannel { private volatile LocalAddress remoteAddress; private volatile ChannelPromise connectPromise; private volatile boolean readInProgress; - private volatile boolean registerInProgress; private volatile boolean writeInProgress; private volatile Future finishReadFuture; @@ -172,13 +171,8 @@ public class LocalChannel extends AbstractChannel { // See https://github.com/netty/netty/issues/2400 if (peer != null && parent() != null) { // Store the peer in a local variable as it may be set to null if doClose() is called. - // Because of this we also set registerInProgress to true as we check for this in doClose() and make sure - // we delay the fireChannelInactive() to be fired after the fireChannelActive() and so keep the correct - // order of events. - // // See https://github.com/netty/netty/issues/2144 final LocalChannel peer = this.peer; - registerInProgress = true; state = State.CONNECTED; peer.remoteAddress = parent() == null ? null : parent().localAddress(); @@ -191,7 +185,6 @@ public class LocalChannel extends AbstractChannel { peer.eventLoop().execute(new Runnable() { @Override public void run() { - registerInProgress = false; ChannelPromise promise = peer.connectPromise; // Only trigger fireChannelActive() if the promise was not null and was not completed yet. @@ -237,7 +230,9 @@ public class LocalChannel extends AbstractChannel { state = State.CLOSED; // Preserve order of event and force a read operation now before the close operation is processed. - finishPeerRead(this); + if (writeInProgress && peer != null) { + finishPeerRead(peer); + } ChannelPromise promise = connectPromise; if (promise != null) { @@ -249,34 +244,29 @@ public class LocalChannel extends AbstractChannel { if (peer != null) { this.peer = null; - // Need to execute the close in the correct EventLoop (see https://github.com/netty/netty/issues/1777). - // Also check if the registration was not done yet. In this case we submit the close to the EventLoop - // to make sure its run after the registration completes - // (see https://github.com/netty/netty/issues/2144). + // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true. + // This ensures that if both channels are on the same event loop, the peer's channelInActive + // event is triggered *after* this peer's channelInActive event EventLoop peerEventLoop = peer.eventLoop(); final boolean peerIsActive = peer.isActive(); - if (peerEventLoop.inEventLoop() && !registerInProgress) { - peer.tryClose(peerIsActive); - } else { - try { - peerEventLoop.execute(new Runnable() { - @Override - public void run() { - peer.tryClose(peerIsActive); - } - }); - } catch (Throwable cause) { - logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!", - this, peer, cause); - if (peerEventLoop.inEventLoop()) { - peer.releaseInboundBuffers(); - } else { - // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or - // rejects the close Runnable but give a best effort. - peer.close(); + try { + peerEventLoop.execute(new Runnable() { + @Override + public void run() { + peer.tryClose(peerIsActive); } - PlatformDependent.throwException(cause); + }); + } catch (Throwable cause) { + logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!", + this, peer, cause); + if (peerEventLoop.inEventLoop()) { + peer.releaseInboundBuffers(); + } else { + // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or + // rejects the close Runnable but give a best effort. + peer.close(); } + PlatformDependent.throwException(cause); } } } finally { diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index 6d4b4686ff..a8def572b8 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -390,6 +390,73 @@ public class LocalChannelTest { } } + @Test + public void testCloseAfterWriteInSameEventLoopPreservesOrder() throws InterruptedException { + Bootstrap cb = new Bootstrap(); + ServerBootstrap sb = new ServerBootstrap(); + final CountDownLatch messageLatch = new CountDownLatch(3); + final ByteBuf data = Unpooled.wrappedBuffer(new byte[1024]); + + try { + cb.group(sharedGroup) + .channel(LocalChannel.class) + .handler(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.writeAndFlush(data.retainedDuplicate()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (data.equals(msg)) { + ReferenceCountUtil.safeRelease(msg); + messageLatch.countDown(); + } else { + super.channelRead(ctx, msg); + } + } + }); + + sb.group(sharedGroup) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (data.equals(msg)) { + messageLatch.countDown(); + ctx.writeAndFlush(data); + ctx.close(); + } else { + super.channelRead(ctx, msg); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + messageLatch.countDown(); + super.channelInactive(ctx); + } + }); + + Channel sc = null; + Channel cc = null; + try { + // Start server + sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel(); + + // Connect to the server + cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel(); + assertTrue(messageLatch.await(5, SECONDS)); + assertFalse(cc.isOpen()); + } finally { + closeChannel(cc); + closeChannel(sc); + } + } finally { + data.release(); + } + } + @Test public void testWriteInWritePromiseCompletePreservesOrder() throws InterruptedException { Bootstrap cb = new Bootstrap(); @@ -620,85 +687,6 @@ public class LocalChannelTest { } } - @Test - public void testClosePeerInWritePromiseCompleteSameEventLoopPreservesOrder() throws InterruptedException { - Bootstrap cb = new Bootstrap(); - ServerBootstrap sb = new ServerBootstrap(); - final CountDownLatch messageLatch = new CountDownLatch(2); - final CountDownLatch serverChannelLatch = new CountDownLatch(1); - final ByteBuf data = Unpooled.wrappedBuffer(new byte[1024]); - final AtomicReference serverChannelRef = new AtomicReference(); - - try { - cb.group(sharedGroup) - .channel(LocalChannel.class) - .handler(new TestHandler()); - - sb.group(sharedGroup) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg.equals(data)) { - ReferenceCountUtil.safeRelease(msg); - messageLatch.countDown(); - } else { - super.channelRead(ctx, msg); - } - } - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - messageLatch.countDown(); - super.channelInactive(ctx); - } - }); - serverChannelRef.set(ch); - serverChannelLatch.countDown(); - } - }); - - Channel sc = null; - Channel cc = null; - try { - // Start server - sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel(); - - // Connect to the server - cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel(); - - assertTrue(serverChannelLatch.await(5, SECONDS)); - - final Channel ccCpy = cc; - // Make sure a write operation is executed in the eventloop - cc.pipeline().lastContext().executor().execute(new Runnable() { - @Override - public void run() { - ChannelPromise promise = ccCpy.newPromise(); - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - serverChannelRef.get().close(); - } - }); - ccCpy.writeAndFlush(data.retainedDuplicate(), promise); - } - }); - - assertTrue(messageLatch.await(5, SECONDS)); - assertFalse(cc.isOpen()); - assertFalse(serverChannelRef.get().isOpen()); - } finally { - closeChannel(cc); - closeChannel(sc); - } - } finally { - data.release(); - } - } - @Test public void testWriteWhilePeerIsClosedReleaseObjectAndFailPromise() throws InterruptedException { Bootstrap cb = new Bootstrap();