From e3f1416478f7903e51209630385c691e6e44105f Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Fri, 28 Aug 2015 09:46:35 -0700 Subject: [PATCH] LocalChannelWrite event sequencing issue Motivation: https://github.com/netty/netty/pull/4143 addressed a few ordering issues but an ordering issue still remained if the Promise for a write completes, and a listener of that promise does a write on a peer channel. The ordering was subject to how potentially 2 different executors would run a task, but it should be coordinated such that the first write is read first. Modifications: - Keep track of the finishPeerRead task run on the executor if necessary and ensure it completes before current channel read occurs Result: Ordering of events for echo type situations is preserved. --- .../io/netty/channel/local/LocalChannel.java | 52 +++++++++++++++--- .../netty/channel/local/LocalChannelTest.java | 54 ++++++++----------- 2 files changed, 66 insertions(+), 40 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 655dcb4a23..6d782d9c54 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -28,6 +28,7 @@ import io.netty.channel.EventLoop; import io.netty.channel.SingleThreadEventLoop; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.SingleThreadEventExecutor; +import io.netty.util.concurrent.Future; import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; @@ -38,14 +39,16 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.ConnectionPendingException; import java.nio.channels.NotYetConnectedException; import java.util.Queue; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * A {@link Channel} for the local transport. */ public class LocalChannel extends AbstractChannel { + @SuppressWarnings({ "rawtypes" }) + private static final AtomicReferenceFieldUpdater FINISH_READ_FUTURE_UPDATER; private static final ChannelMetadata METADATA = new ChannelMetadata(false); - private static final int MAX_READER_STACK_DEPTH = 8; private final ChannelConfig config = new DefaultChannelConfig(this); @@ -80,6 +83,18 @@ public class LocalChannel extends AbstractChannel { private volatile boolean readInProgress; private volatile boolean registerInProgress; private volatile boolean writeInProgress; + private volatile Future finishReadFuture; + + static { + @SuppressWarnings({ "rawtypes" }) + AtomicReferenceFieldUpdater finishReadFutureUpdater = + PlatformDependent.newAtomicReferenceFieldUpdater(LocalChannel.class, "finishReadFuture"); + if (finishReadFutureUpdater == null) { + finishReadFutureUpdater = + AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture"); + } + FINISH_READ_FUTURE_UPDATER = finishReadFutureUpdater; + } public LocalChannel() { super(null); @@ -324,16 +339,37 @@ public class LocalChannel extends AbstractChannel { if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) { finishPeerRead0(peer); } else { - peer.eventLoop().execute(new OneTimeTask() { - @Override - public void run() { - finishPeerRead0(peer); - } - }); + runFinishPeerReadTask(peer); } } - private static void finishPeerRead0(LocalChannel peer) { + private void runFinishPeerReadTask(final LocalChannel peer) { + // If the peer is writing, we must wait until after reads are completed for that peer before we can read. So + // we keep track of the task, and coordinate later that our read can't happen until the peer is done. + final Runnable finishPeerReadTask = new OneTimeTask() { + @Override + public void run() { + finishPeerRead0(peer); + } + }; + if (peer.writeInProgress) { + peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask); + } else { + peer.eventLoop().execute(finishPeerReadTask); + } + } + + private void finishPeerRead0(LocalChannel peer) { + Future peerFinishReadFuture = peer.finishReadFuture; + if (peerFinishReadFuture != null) { + if (!peerFinishReadFuture.isDone()) { + runFinishPeerReadTask(peer); + return; + } else { + // Lazy unset to make sure we don't prematurely unset it while scheduling a new task. + FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null); + } + } ChannelPipeline peerPipeline = peer.pipeline(); if (peer.readInProgress) { peer.readInProgress = false; diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index 65d8777819..ca8558af40 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -58,7 +58,7 @@ public class LocalChannelTest { private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannelTest.class); - private static final String LOCAL_ADDR_ID = "test.id"; + private static final LocalAddress TEST_ADDRESS = new LocalAddress("test.id"); private static EventLoopGroup group1; private static EventLoopGroup group2; @@ -84,7 +84,6 @@ public class LocalChannelTest { @Test public void testLocalAddressReuse() throws Exception { for (int i = 0; i < 2; i ++) { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); @@ -105,11 +104,11 @@ public class LocalChannelTest { Channel cc = null; try { // Start server - sc = sb.bind(addr).sync().channel(); + sc = sb.bind(TEST_ADDRESS).sync().channel(); final CountDownLatch latch = new CountDownLatch(1); // Connect to the server - cc = cb.connect(addr).sync().channel(); + cc = cb.connect(sc.localAddress()).sync().channel(); final Channel ccCpy = cc; cc.eventLoop().execute(new Runnable() { @Override @@ -128,7 +127,7 @@ public class LocalChannelTest { assertNull(String.format( "Expected null, got channel '%s' for local address '%s'", - LocalChannelRegistry.get(addr), addr), LocalChannelRegistry.get(addr)); + LocalChannelRegistry.get(TEST_ADDRESS), TEST_ADDRESS), LocalChannelRegistry.get(TEST_ADDRESS)); } finally { closeChannel(cc); closeChannel(sc); @@ -138,7 +137,6 @@ public class LocalChannelTest { @Test public void testWriteFailsFastOnClosedChannel() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); @@ -159,10 +157,10 @@ public class LocalChannelTest { Channel cc = null; try { // Start server - sc = sb.bind(addr).sync().channel(); + sc = sb.bind(TEST_ADDRESS).sync().channel(); // Connect to the server - cc = cb.connect(addr).sync().channel(); + cc = cb.connect(sc.localAddress()).sync().channel(); // Close the channel and write something. cc.close().sync(); @@ -188,7 +186,6 @@ public class LocalChannelTest { @Test public void testServerCloseChannelSameEventLoop() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); final CountDownLatch latch = new CountDownLatch(1); ServerBootstrap sb = new ServerBootstrap() .group(group2) @@ -203,7 +200,7 @@ public class LocalChannelTest { Channel sc = null; Channel cc = null; try { - sc = sb.bind(addr).sync().channel(); + sc = sb.bind(TEST_ADDRESS).sync().channel(); Bootstrap b = new Bootstrap() .group(group2) @@ -214,7 +211,7 @@ public class LocalChannelTest { // discard } }); - cc = b.connect(addr).sync().channel(); + cc = b.connect(sc.localAddress()).sync().channel(); cc.writeAndFlush(new Object()); assertTrue(latch.await(5, SECONDS)); } finally { @@ -225,7 +222,6 @@ public class LocalChannelTest { @Test public void localChannelRaceCondition() throws Exception { - final LocalAddress address = new LocalAddress(LOCAL_ADDR_ID); final CountDownLatch closeLatch = new CountDownLatch(1); final EventLoopGroup clientGroup = new LocalEventLoopGroup(1) { @Override @@ -270,7 +266,7 @@ public class LocalChannelTest { closeLatch.countDown(); } }). - bind(address). + bind(TEST_ADDRESS). sync().channel(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(clientGroup). @@ -281,7 +277,7 @@ public class LocalChannelTest { /* Do nothing */ } }); - ChannelFuture future = bootstrap.connect(address); + ChannelFuture future = bootstrap.connect(sc.localAddress()); assertTrue("Connection should finish, not time out", future.await(200)); cc = future.channel(); } finally { @@ -293,7 +289,6 @@ public class LocalChannelTest { @Test public void testReRegister() { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); @@ -314,10 +309,10 @@ public class LocalChannelTest { Channel cc = null; try { // Start server - sc = sb.bind(addr).syncUninterruptibly().channel(); + sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel(); // Connect to the server - cc = cb.connect(addr).syncUninterruptibly().channel(); + cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel(); cc.deregister().syncUninterruptibly(); } finally { @@ -328,7 +323,6 @@ public class LocalChannelTest { @Test public void testCloseInWritePromiseCompletePreservesOrder() throws InterruptedException { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); final CountDownLatch messageLatch = new CountDownLatch(2); @@ -362,10 +356,10 @@ public class LocalChannelTest { Channel cc = null; try { // Start server - sc = sb.bind(addr).syncUninterruptibly().channel(); + sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel(); // Connect to the server - cc = cb.connect(addr).syncUninterruptibly().channel(); + cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel(); final Channel ccCpy = cc; // Make sure a write operation is executed in the eventloop @@ -396,7 +390,6 @@ public class LocalChannelTest { @Test public void testWriteInWritePromiseCompletePreservesOrder() throws InterruptedException { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); final CountDownLatch messageLatch = new CountDownLatch(2); @@ -427,10 +420,10 @@ public class LocalChannelTest { Channel cc = null; try { // Start server - sc = sb.bind(addr).syncUninterruptibly().channel(); + sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel(); // Connect to the server - cc = cb.connect(addr).syncUninterruptibly().channel(); + cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel(); final Channel ccCpy = cc; // Make sure a write operation is executed in the eventloop @@ -461,7 +454,6 @@ public class LocalChannelTest { @Test public void testPeerWriteInWritePromiseCompleteDifferentEventLoopPreservesOrder() throws InterruptedException { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); final CountDownLatch messageLatch = new CountDownLatch(2); @@ -509,10 +501,10 @@ public class LocalChannelTest { Channel cc = null; try { // Start server - sc = sb.bind(addr).syncUninterruptibly().channel(); + sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel(); // Connect to the server - cc = cb.connect(addr).syncUninterruptibly().channel(); + cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel(); assertTrue(serverChannelLatch.await(5, SECONDS)); final Channel ccCpy = cc; @@ -543,7 +535,6 @@ public class LocalChannelTest { @Test public void testPeerWriteInWritePromiseCompleteSameEventLoopPreservesOrder() throws InterruptedException { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); final CountDownLatch messageLatch = new CountDownLatch(2); @@ -592,10 +583,10 @@ public class LocalChannelTest { Channel cc = null; try { // Start server - sc = sb.bind(addr).syncUninterruptibly().channel(); + sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel(); // Connect to the server - cc = cb.connect(addr).syncUninterruptibly().channel(); + cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel(); assertTrue(serverChannelLatch.await(5, SECONDS)); final Channel ccCpy = cc; @@ -629,7 +620,6 @@ public class LocalChannelTest { @Test public void testClosePeerInWritePromiseCompleteSameEventLoopPreservesOrder() throws InterruptedException { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); final CountDownLatch messageLatch = new CountDownLatch(2); @@ -672,10 +662,10 @@ public class LocalChannelTest { Channel cc = null; try { // Start server - sc = sb.bind(addr).syncUninterruptibly().channel(); + sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel(); // Connect to the server - cc = cb.connect(addr).syncUninterruptibly().channel(); + cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel(); assertTrue(serverChannelLatch.await(5, SECONDS));