FIX: force a read operation for peer instead of self (#7454)

* FIX: force a read operation for peer instead of self

Motivation:
When A is in `writeInProgress` and call self close, A should
`finishPeerRead` for B(A' peer).

Modifications:
Call `finishPeerRead` with peer in `LocalChannel#doClose`

Result:
Clear confuse of code logic

* FIX: preserves order of close after write in same event loop

Motivation:
If client and server(client's peer channel) are in same event loop, client writes data to
server in `ChannelActive`. Server receives the data and write it
back. The client's read can't be triggered becasue client's
`ChannelActive` is not finished at this point and its `readInProgress`
is false. Then server closes itself, it will also close the client's
channel. And client has no chance to receive the data.

Modifications:
1. Add a test case to demonstrate the problem
2. When `doClose` peer, we always call
`peer.eventLoop().execute()` and `registerInProgress` is not needed.
3. Remove test case
`testClosePeerInWritePromiseCompleteSameEventLoopPreservesOrder`. This
test case can't pass becasue of this commit. IMHO, I think it is OK,
becasue it is reasonable that the client flushes the data to socket,
then server close the channel without received the data.
4. For mismatch test in SniClientTest, the client should receive server's alert before closed(caused by server's close)

Result:
The problem is gone.
This commit is contained in:
louxiu 2017-12-08 09:05:57 +08:00 committed by Scott Mitchell
parent 0cac1a6c8c
commit 805ac002e6
3 changed files with 94 additions and 116 deletions

View File

@ -34,7 +34,7 @@ import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import java.nio.channels.ClosedChannelException;
import javax.net.ssl.SSLException;
public class SniClientTest {
@ -67,7 +67,7 @@ public class SniClientTest {
SniClientJava8TestUtil.testSniClient(SslProvider.JDK, SslProvider.JDK, true);
}
@Test(timeout = 30000, expected = ClosedChannelException.class)
@Test(timeout = 30000, expected = SSLException.class)
public void testSniSNIMatcherDoesNotMatchClientJdkSslServerJdkSsl() throws Exception {
Assume.assumeTrue(PlatformDependent.javaVersion() >= 8);
SniClientJava8TestUtil.testSniClient(SslProvider.JDK, SslProvider.JDK, false);
@ -80,7 +80,7 @@ public class SniClientTest {
SniClientJava8TestUtil.testSniClient(SslProvider.OPENSSL, SslProvider.OPENSSL, true);
}
@Test(timeout = 30000, expected = ClosedChannelException.class)
@Test(timeout = 30000, expected = SSLException.class)
public void testSniSNIMatcherDoesNotMatchClientOpenSslServerOpenSsl() throws Exception {
Assume.assumeTrue(PlatformDependent.javaVersion() >= 8);
Assume.assumeTrue(OpenSsl.isAvailable());
@ -94,7 +94,7 @@ public class SniClientTest {
SniClientJava8TestUtil.testSniClient(SslProvider.JDK, SslProvider.OPENSSL, true);
}
@Test(timeout = 30000, expected = ClosedChannelException.class)
@Test(timeout = 30000, expected = SSLException.class)
public void testSniSNIMatcherDoesNotMatchClientJdkSslServerOpenSsl() throws Exception {
Assume.assumeTrue(PlatformDependent.javaVersion() >= 8);
Assume.assumeTrue(OpenSsl.isAvailable());
@ -108,7 +108,7 @@ public class SniClientTest {
SniClientJava8TestUtil.testSniClient(SslProvider.OPENSSL, SslProvider.JDK, true);
}
@Test(timeout = 30000, expected = ClosedChannelException.class)
@Test(timeout = 30000, expected = SSLException.class)
public void testSniSNIMatcherDoesNotMatchClientOpenSslServerJdkSsl() throws Exception {
Assume.assumeTrue(PlatformDependent.javaVersion() >= 8);
Assume.assumeTrue(OpenSsl.isAvailable());

View File

@ -91,7 +91,6 @@ public class LocalChannel extends AbstractChannel {
private volatile LocalAddress remoteAddress;
private volatile ChannelPromise connectPromise;
private volatile boolean readInProgress;
private volatile boolean registerInProgress;
private volatile boolean writeInProgress;
private volatile Future<?> finishReadFuture;
@ -172,13 +171,8 @@ public class LocalChannel extends AbstractChannel {
// See https://github.com/netty/netty/issues/2400
if (peer != null && parent() != null) {
// Store the peer in a local variable as it may be set to null if doClose() is called.
// Because of this we also set registerInProgress to true as we check for this in doClose() and make sure
// we delay the fireChannelInactive() to be fired after the fireChannelActive() and so keep the correct
// order of events.
//
// See https://github.com/netty/netty/issues/2144
final LocalChannel peer = this.peer;
registerInProgress = true;
state = State.CONNECTED;
peer.remoteAddress = parent() == null ? null : parent().localAddress();
@ -191,7 +185,6 @@ public class LocalChannel extends AbstractChannel {
peer.eventLoop().execute(new Runnable() {
@Override
public void run() {
registerInProgress = false;
ChannelPromise promise = peer.connectPromise;
// Only trigger fireChannelActive() if the promise was not null and was not completed yet.
@ -237,7 +230,9 @@ public class LocalChannel extends AbstractChannel {
state = State.CLOSED;
// Preserve order of event and force a read operation now before the close operation is processed.
finishPeerRead(this);
if (writeInProgress && peer != null) {
finishPeerRead(peer);
}
ChannelPromise promise = connectPromise;
if (promise != null) {
@ -249,34 +244,29 @@ public class LocalChannel extends AbstractChannel {
if (peer != null) {
this.peer = null;
// Need to execute the close in the correct EventLoop (see https://github.com/netty/netty/issues/1777).
// Also check if the registration was not done yet. In this case we submit the close to the EventLoop
// to make sure its run after the registration completes
// (see https://github.com/netty/netty/issues/2144).
// Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
// This ensures that if both channels are on the same event loop, the peer's channelInActive
// event is triggered *after* this peer's channelInActive event
EventLoop peerEventLoop = peer.eventLoop();
final boolean peerIsActive = peer.isActive();
if (peerEventLoop.inEventLoop() && !registerInProgress) {
peer.tryClose(peerIsActive);
} else {
try {
peerEventLoop.execute(new Runnable() {
@Override
public void run() {
peer.tryClose(peerIsActive);
}
});
} catch (Throwable cause) {
logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
this, peer, cause);
if (peerEventLoop.inEventLoop()) {
peer.releaseInboundBuffers();
} else {
// inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
// rejects the close Runnable but give a best effort.
peer.close();
try {
peerEventLoop.execute(new Runnable() {
@Override
public void run() {
peer.tryClose(peerIsActive);
}
PlatformDependent.throwException(cause);
});
} catch (Throwable cause) {
logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
this, peer, cause);
if (peerEventLoop.inEventLoop()) {
peer.releaseInboundBuffers();
} else {
// inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
// rejects the close Runnable but give a best effort.
peer.close();
}
PlatformDependent.throwException(cause);
}
}
} finally {

View File

@ -390,6 +390,73 @@ public class LocalChannelTest {
}
}
@Test
public void testCloseAfterWriteInSameEventLoopPreservesOrder() throws InterruptedException {
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
final CountDownLatch messageLatch = new CountDownLatch(3);
final ByteBuf data = Unpooled.wrappedBuffer(new byte[1024]);
try {
cb.group(sharedGroup)
.channel(LocalChannel.class)
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(data.retainedDuplicate());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (data.equals(msg)) {
ReferenceCountUtil.safeRelease(msg);
messageLatch.countDown();
} else {
super.channelRead(ctx, msg);
}
}
});
sb.group(sharedGroup)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (data.equals(msg)) {
messageLatch.countDown();
ctx.writeAndFlush(data);
ctx.close();
} else {
super.channelRead(ctx, msg);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
messageLatch.countDown();
super.channelInactive(ctx);
}
});
Channel sc = null;
Channel cc = null;
try {
// Start server
sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel();
// Connect to the server
cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel();
assertTrue(messageLatch.await(5, SECONDS));
assertFalse(cc.isOpen());
} finally {
closeChannel(cc);
closeChannel(sc);
}
} finally {
data.release();
}
}
@Test
public void testWriteInWritePromiseCompletePreservesOrder() throws InterruptedException {
Bootstrap cb = new Bootstrap();
@ -620,85 +687,6 @@ public class LocalChannelTest {
}
}
@Test
public void testClosePeerInWritePromiseCompleteSameEventLoopPreservesOrder() throws InterruptedException {
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
final CountDownLatch messageLatch = new CountDownLatch(2);
final CountDownLatch serverChannelLatch = new CountDownLatch(1);
final ByteBuf data = Unpooled.wrappedBuffer(new byte[1024]);
final AtomicReference<Channel> serverChannelRef = new AtomicReference<Channel>();
try {
cb.group(sharedGroup)
.channel(LocalChannel.class)
.handler(new TestHandler());
sb.group(sharedGroup)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg.equals(data)) {
ReferenceCountUtil.safeRelease(msg);
messageLatch.countDown();
} else {
super.channelRead(ctx, msg);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
messageLatch.countDown();
super.channelInactive(ctx);
}
});
serverChannelRef.set(ch);
serverChannelLatch.countDown();
}
});
Channel sc = null;
Channel cc = null;
try {
// Start server
sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel();
// Connect to the server
cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel();
assertTrue(serverChannelLatch.await(5, SECONDS));
final Channel ccCpy = cc;
// Make sure a write operation is executed in the eventloop
cc.pipeline().lastContext().executor().execute(new Runnable() {
@Override
public void run() {
ChannelPromise promise = ccCpy.newPromise();
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
serverChannelRef.get().close();
}
});
ccCpy.writeAndFlush(data.retainedDuplicate(), promise);
}
});
assertTrue(messageLatch.await(5, SECONDS));
assertFalse(cc.isOpen());
assertFalse(serverChannelRef.get().isOpen());
} finally {
closeChannel(cc);
closeChannel(sc);
}
} finally {
data.release();
}
}
@Test
public void testWriteWhilePeerIsClosedReleaseObjectAndFailPromise() throws InterruptedException {
Bootstrap cb = new Bootstrap();