LocalChannelWrite event sequencing issue

Motivation:
https://github.com/netty/netty/pull/4143 addressed a few ordering issues but an ordering issue still remained if the Promise for a write completes, and a listener of that promise does a write on a peer channel. The ordering was subject to how potentially 2 different executors would run a task, but it should be coordinated such that the first write is read first.

Modifications:
- Keep track of the finishPeerRead task run on the executor if necessary and ensure it completes before current channel read occurs

Result:
Ordering of events for echo type situations is preserved.
This commit is contained in:
Scott Mitchell 2015-08-28 09:46:35 -07:00
parent 37eedb60fe
commit e3f1416478
2 changed files with 66 additions and 40 deletions

View File

@ -28,6 +28,7 @@ import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
@ -38,14 +39,16 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException; import java.nio.channels.ConnectionPendingException;
import java.nio.channels.NotYetConnectedException; import java.nio.channels.NotYetConnectedException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/** /**
* A {@link Channel} for the local transport. * A {@link Channel} for the local transport.
*/ */
public class LocalChannel extends AbstractChannel { public class LocalChannel extends AbstractChannel {
@SuppressWarnings({ "rawtypes" })
private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER;
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static final int MAX_READER_STACK_DEPTH = 8; private static final int MAX_READER_STACK_DEPTH = 8;
private final ChannelConfig config = new DefaultChannelConfig(this); private final ChannelConfig config = new DefaultChannelConfig(this);
@ -80,6 +83,18 @@ public class LocalChannel extends AbstractChannel {
private volatile boolean readInProgress; private volatile boolean readInProgress;
private volatile boolean registerInProgress; private volatile boolean registerInProgress;
private volatile boolean writeInProgress; private volatile boolean writeInProgress;
private volatile Future<?> finishReadFuture;
static {
@SuppressWarnings({ "rawtypes" })
AtomicReferenceFieldUpdater<LocalChannel, Future> finishReadFutureUpdater =
PlatformDependent.newAtomicReferenceFieldUpdater(LocalChannel.class, "finishReadFuture");
if (finishReadFutureUpdater == null) {
finishReadFutureUpdater =
AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
}
FINISH_READ_FUTURE_UPDATER = finishReadFutureUpdater;
}
public LocalChannel() { public LocalChannel() {
super(null); super(null);
@ -324,16 +339,37 @@ public class LocalChannel extends AbstractChannel {
if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) { if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
finishPeerRead0(peer); finishPeerRead0(peer);
} else { } else {
peer.eventLoop().execute(new OneTimeTask() { runFinishPeerReadTask(peer);
@Override
public void run() {
finishPeerRead0(peer);
}
});
} }
} }
private static void finishPeerRead0(LocalChannel peer) { private void runFinishPeerReadTask(final LocalChannel peer) {
// If the peer is writing, we must wait until after reads are completed for that peer before we can read. So
// we keep track of the task, and coordinate later that our read can't happen until the peer is done.
final Runnable finishPeerReadTask = new OneTimeTask() {
@Override
public void run() {
finishPeerRead0(peer);
}
};
if (peer.writeInProgress) {
peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask);
} else {
peer.eventLoop().execute(finishPeerReadTask);
}
}
private void finishPeerRead0(LocalChannel peer) {
Future<?> peerFinishReadFuture = peer.finishReadFuture;
if (peerFinishReadFuture != null) {
if (!peerFinishReadFuture.isDone()) {
runFinishPeerReadTask(peer);
return;
} else {
// Lazy unset to make sure we don't prematurely unset it while scheduling a new task.
FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
}
}
ChannelPipeline peerPipeline = peer.pipeline(); ChannelPipeline peerPipeline = peer.pipeline();
if (peer.readInProgress) { if (peer.readInProgress) {
peer.readInProgress = false; peer.readInProgress = false;

View File

@ -58,7 +58,7 @@ public class LocalChannelTest {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannelTest.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannelTest.class);
private static final String LOCAL_ADDR_ID = "test.id"; private static final LocalAddress TEST_ADDRESS = new LocalAddress("test.id");
private static EventLoopGroup group1; private static EventLoopGroup group1;
private static EventLoopGroup group2; private static EventLoopGroup group2;
@ -84,7 +84,6 @@ public class LocalChannelTest {
@Test @Test
public void testLocalAddressReuse() throws Exception { public void testLocalAddressReuse() throws Exception {
for (int i = 0; i < 2; i ++) { for (int i = 0; i < 2; i ++) {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap(); ServerBootstrap sb = new ServerBootstrap();
@ -105,11 +104,11 @@ public class LocalChannelTest {
Channel cc = null; Channel cc = null;
try { try {
// Start server // Start server
sc = sb.bind(addr).sync().channel(); sc = sb.bind(TEST_ADDRESS).sync().channel();
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
// Connect to the server // Connect to the server
cc = cb.connect(addr).sync().channel(); cc = cb.connect(sc.localAddress()).sync().channel();
final Channel ccCpy = cc; final Channel ccCpy = cc;
cc.eventLoop().execute(new Runnable() { cc.eventLoop().execute(new Runnable() {
@Override @Override
@ -128,7 +127,7 @@ public class LocalChannelTest {
assertNull(String.format( assertNull(String.format(
"Expected null, got channel '%s' for local address '%s'", "Expected null, got channel '%s' for local address '%s'",
LocalChannelRegistry.get(addr), addr), LocalChannelRegistry.get(addr)); LocalChannelRegistry.get(TEST_ADDRESS), TEST_ADDRESS), LocalChannelRegistry.get(TEST_ADDRESS));
} finally { } finally {
closeChannel(cc); closeChannel(cc);
closeChannel(sc); closeChannel(sc);
@ -138,7 +137,6 @@ public class LocalChannelTest {
@Test @Test
public void testWriteFailsFastOnClosedChannel() throws Exception { public void testWriteFailsFastOnClosedChannel() throws Exception {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap(); ServerBootstrap sb = new ServerBootstrap();
@ -159,10 +157,10 @@ public class LocalChannelTest {
Channel cc = null; Channel cc = null;
try { try {
// Start server // Start server
sc = sb.bind(addr).sync().channel(); sc = sb.bind(TEST_ADDRESS).sync().channel();
// Connect to the server // Connect to the server
cc = cb.connect(addr).sync().channel(); cc = cb.connect(sc.localAddress()).sync().channel();
// Close the channel and write something. // Close the channel and write something.
cc.close().sync(); cc.close().sync();
@ -188,7 +186,6 @@ public class LocalChannelTest {
@Test @Test
public void testServerCloseChannelSameEventLoop() throws Exception { public void testServerCloseChannelSameEventLoop() throws Exception {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
ServerBootstrap sb = new ServerBootstrap() ServerBootstrap sb = new ServerBootstrap()
.group(group2) .group(group2)
@ -203,7 +200,7 @@ public class LocalChannelTest {
Channel sc = null; Channel sc = null;
Channel cc = null; Channel cc = null;
try { try {
sc = sb.bind(addr).sync().channel(); sc = sb.bind(TEST_ADDRESS).sync().channel();
Bootstrap b = new Bootstrap() Bootstrap b = new Bootstrap()
.group(group2) .group(group2)
@ -214,7 +211,7 @@ public class LocalChannelTest {
// discard // discard
} }
}); });
cc = b.connect(addr).sync().channel(); cc = b.connect(sc.localAddress()).sync().channel();
cc.writeAndFlush(new Object()); cc.writeAndFlush(new Object());
assertTrue(latch.await(5, SECONDS)); assertTrue(latch.await(5, SECONDS));
} finally { } finally {
@ -225,7 +222,6 @@ public class LocalChannelTest {
@Test @Test
public void localChannelRaceCondition() throws Exception { public void localChannelRaceCondition() throws Exception {
final LocalAddress address = new LocalAddress(LOCAL_ADDR_ID);
final CountDownLatch closeLatch = new CountDownLatch(1); final CountDownLatch closeLatch = new CountDownLatch(1);
final EventLoopGroup clientGroup = new LocalEventLoopGroup(1) { final EventLoopGroup clientGroup = new LocalEventLoopGroup(1) {
@Override @Override
@ -270,7 +266,7 @@ public class LocalChannelTest {
closeLatch.countDown(); closeLatch.countDown();
} }
}). }).
bind(address). bind(TEST_ADDRESS).
sync().channel(); sync().channel();
Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientGroup). bootstrap.group(clientGroup).
@ -281,7 +277,7 @@ public class LocalChannelTest {
/* Do nothing */ /* Do nothing */
} }
}); });
ChannelFuture future = bootstrap.connect(address); ChannelFuture future = bootstrap.connect(sc.localAddress());
assertTrue("Connection should finish, not time out", future.await(200)); assertTrue("Connection should finish, not time out", future.await(200));
cc = future.channel(); cc = future.channel();
} finally { } finally {
@ -293,7 +289,6 @@ public class LocalChannelTest {
@Test @Test
public void testReRegister() { public void testReRegister() {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap(); ServerBootstrap sb = new ServerBootstrap();
@ -314,10 +309,10 @@ public class LocalChannelTest {
Channel cc = null; Channel cc = null;
try { try {
// Start server // Start server
sc = sb.bind(addr).syncUninterruptibly().channel(); sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel();
// Connect to the server // Connect to the server
cc = cb.connect(addr).syncUninterruptibly().channel(); cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel();
cc.deregister().syncUninterruptibly(); cc.deregister().syncUninterruptibly();
} finally { } finally {
@ -328,7 +323,6 @@ public class LocalChannelTest {
@Test @Test
public void testCloseInWritePromiseCompletePreservesOrder() throws InterruptedException { public void testCloseInWritePromiseCompletePreservesOrder() throws InterruptedException {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap(); ServerBootstrap sb = new ServerBootstrap();
final CountDownLatch messageLatch = new CountDownLatch(2); final CountDownLatch messageLatch = new CountDownLatch(2);
@ -362,10 +356,10 @@ public class LocalChannelTest {
Channel cc = null; Channel cc = null;
try { try {
// Start server // Start server
sc = sb.bind(addr).syncUninterruptibly().channel(); sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel();
// Connect to the server // Connect to the server
cc = cb.connect(addr).syncUninterruptibly().channel(); cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel();
final Channel ccCpy = cc; final Channel ccCpy = cc;
// Make sure a write operation is executed in the eventloop // Make sure a write operation is executed in the eventloop
@ -396,7 +390,6 @@ public class LocalChannelTest {
@Test @Test
public void testWriteInWritePromiseCompletePreservesOrder() throws InterruptedException { public void testWriteInWritePromiseCompletePreservesOrder() throws InterruptedException {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap(); ServerBootstrap sb = new ServerBootstrap();
final CountDownLatch messageLatch = new CountDownLatch(2); final CountDownLatch messageLatch = new CountDownLatch(2);
@ -427,10 +420,10 @@ public class LocalChannelTest {
Channel cc = null; Channel cc = null;
try { try {
// Start server // Start server
sc = sb.bind(addr).syncUninterruptibly().channel(); sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel();
// Connect to the server // Connect to the server
cc = cb.connect(addr).syncUninterruptibly().channel(); cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel();
final Channel ccCpy = cc; final Channel ccCpy = cc;
// Make sure a write operation is executed in the eventloop // Make sure a write operation is executed in the eventloop
@ -461,7 +454,6 @@ public class LocalChannelTest {
@Test @Test
public void testPeerWriteInWritePromiseCompleteDifferentEventLoopPreservesOrder() throws InterruptedException { public void testPeerWriteInWritePromiseCompleteDifferentEventLoopPreservesOrder() throws InterruptedException {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap(); ServerBootstrap sb = new ServerBootstrap();
final CountDownLatch messageLatch = new CountDownLatch(2); final CountDownLatch messageLatch = new CountDownLatch(2);
@ -509,10 +501,10 @@ public class LocalChannelTest {
Channel cc = null; Channel cc = null;
try { try {
// Start server // Start server
sc = sb.bind(addr).syncUninterruptibly().channel(); sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel();
// Connect to the server // Connect to the server
cc = cb.connect(addr).syncUninterruptibly().channel(); cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel();
assertTrue(serverChannelLatch.await(5, SECONDS)); assertTrue(serverChannelLatch.await(5, SECONDS));
final Channel ccCpy = cc; final Channel ccCpy = cc;
@ -543,7 +535,6 @@ public class LocalChannelTest {
@Test @Test
public void testPeerWriteInWritePromiseCompleteSameEventLoopPreservesOrder() throws InterruptedException { public void testPeerWriteInWritePromiseCompleteSameEventLoopPreservesOrder() throws InterruptedException {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap(); ServerBootstrap sb = new ServerBootstrap();
final CountDownLatch messageLatch = new CountDownLatch(2); final CountDownLatch messageLatch = new CountDownLatch(2);
@ -592,10 +583,10 @@ public class LocalChannelTest {
Channel cc = null; Channel cc = null;
try { try {
// Start server // Start server
sc = sb.bind(addr).syncUninterruptibly().channel(); sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel();
// Connect to the server // Connect to the server
cc = cb.connect(addr).syncUninterruptibly().channel(); cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel();
assertTrue(serverChannelLatch.await(5, SECONDS)); assertTrue(serverChannelLatch.await(5, SECONDS));
final Channel ccCpy = cc; final Channel ccCpy = cc;
@ -629,7 +620,6 @@ public class LocalChannelTest {
@Test @Test
public void testClosePeerInWritePromiseCompleteSameEventLoopPreservesOrder() throws InterruptedException { public void testClosePeerInWritePromiseCompleteSameEventLoopPreservesOrder() throws InterruptedException {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap(); ServerBootstrap sb = new ServerBootstrap();
final CountDownLatch messageLatch = new CountDownLatch(2); final CountDownLatch messageLatch = new CountDownLatch(2);
@ -672,10 +662,10 @@ public class LocalChannelTest {
Channel cc = null; Channel cc = null;
try { try {
// Start server // Start server
sc = sb.bind(addr).syncUninterruptibly().channel(); sc = sb.bind(TEST_ADDRESS).syncUninterruptibly().channel();
// Connect to the server // Connect to the server
cc = cb.connect(addr).syncUninterruptibly().channel(); cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel();
assertTrue(serverChannelLatch.await(5, SECONDS)); assertTrue(serverChannelLatch.await(5, SECONDS));