NIO/EPOLL readPending set to false incorrectly

Motivation:
441aa4c575 introduced a bug in transport-native-epoll where readPending is set to false before a read is attempted, but this should happen before fireChannelRead is called. The NIO transport also only sets the readPending variable to false on the first read in the event loop. This means that if the user only calls read() on the first channelRead(..) the select loop will still listen for read events even if the user does not call read() on subsequent channelRead() or channelReadComplete() in the same event loop run. If the user only needs 2 channelRead() calls then by default they will may get 14 more channelRead() calls in the current event loop, and then 16 more when the event loop is woken up for a read event. This will also read data off the TCP stack and allow the peer to queue more data in the local RECV buffers.

Modifications:
- readPending should be set to false before each call to channelRead()
- make NIO readPending set to false consistent with EPOLL

Result:
NIO and EPOLL transport set readPending to false at correct times which don't read more data than intended by the user.
Fixes https://github.com/netty/netty/issues/5082
This commit is contained in:
Scott Mitchell 2016-04-02 16:48:07 -07:00
parent 7d60699a49
commit 9fb86a380d
8 changed files with 200 additions and 32 deletions

View File

@ -318,9 +318,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
*/ */
abstract void epollInReady(); abstract void epollInReady();
final void epollInReadAttempted() { final void epollInBefore() { maybeMoreDataToRead = false; }
readPending = maybeMoreDataToRead = false;
}
final void epollInFinally(ChannelConfig config) { final void epollInFinally(ChannelConfig config) {
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
@ -452,6 +450,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
protected final void clearEpollIn0() { protected final void clearEpollIn0() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
try { try {
readPending = false;
clearFlag(readFlag); clearFlag(readFlag);
} catch (IOException e) { } catch (IOException e) {
// When this happens there is something completely wrong with either the filedescriptor or epoll, // When this happens there is something completely wrong with either the filedescriptor or epoll,

View File

@ -107,17 +107,17 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
@Override @Override
void epollInReady() { void epollInReady() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
final ChannelConfig config = config(); if (fd().isInputShutdown()) {
if (!readPending && !config.isAutoRead() || fd().isInputShutdown()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0(); clearEpollIn0();
return; return;
} }
final ChannelConfig config = config();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET)); allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
allocHandle.reset(config); allocHandle.reset(config);
epollInBefore();
Throwable exception = null; Throwable exception = null;
try { try {
@ -126,7 +126,6 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
// lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the // lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
// EpollRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is // EpollRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is
// enabled. // enabled.
epollInReadAttempted();
allocHandle.lastBytesRead(fd().accept(acceptedAddress)); allocHandle.lastBytesRead(fd().accept(acceptedAddress));
if (allocHandle.lastBytesRead() == -1) { if (allocHandle.lastBytesRead() == -1) {
// this means everything was handled for now // this means everything was handled for now
@ -135,6 +134,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
int len = acceptedAddress[0]; int len = acceptedAddress[0];
readPending = false;
pipeline.fireChannelRead(newChildChannel(allocHandle.lastBytesRead(), acceptedAddress, 1, len)); pipeline.fireChannelRead(newChildChannel(allocHandle.lastBytesRead(), acceptedAddress, 1, len));
} while (allocHandle.continueReading()); } while (allocHandle.continueReading());
} catch (Throwable t) { } catch (Throwable t) {

View File

@ -664,6 +664,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
EpollRecvByteAllocatorHandle allocHandle) { EpollRecvByteAllocatorHandle allocHandle) {
if (byteBuf != null) { if (byteBuf != null) {
if (byteBuf.isReadable()) { if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
} else { } else {
byteBuf.release(); byteBuf.release();
@ -822,19 +823,18 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
@Override @Override
void epollInReady() { void epollInReady() {
final ChannelConfig config = config(); if (fd().isInputShutdown()) {
if (!readPending && !config.isAutoRead() || fd().isInputShutdown()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0(); clearEpollIn0();
return; return;
} }
final ChannelConfig config = config();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET)); allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator(); final ByteBufAllocator allocator = config.getAllocator();
allocHandle.reset(config); allocHandle.reset(config);
epollInBefore();
ByteBuf byteBuf = null; ByteBuf byteBuf = null;
boolean close = false; boolean close = false;
@ -859,7 +859,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
// we use a direct buffer here as the native implementations only be able // we use a direct buffer here as the native implementations only be able
// to handle direct buffers. // to handle direct buffers.
byteBuf = allocHandle.allocate(allocator); byteBuf = allocHandle.allocate(allocator);
epollInReadAttempted();
allocHandle.lastBytesRead(doReadBytes(byteBuf)); allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) { if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer. // nothing was read, release the buffer.
@ -869,6 +868,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
break; break;
} }
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
byteBuf = null; byteBuf = null;

View File

@ -523,18 +523,18 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
@Override @Override
void epollInReady() { void epollInReady() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
DatagramChannelConfig config = config(); if (fd().isInputShutdown()) {
if (!readPending && !config.isAutoRead() || fd().isInputShutdown()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0(); clearEpollIn0();
return; return;
} }
DatagramChannelConfig config = config();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET)); allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator(); final ByteBufAllocator allocator = config.getAllocator();
allocHandle.reset(config); allocHandle.reset(config);
epollInBefore();
Throwable exception = null; Throwable exception = null;
try { try {
@ -544,7 +544,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
data = allocHandle.allocate(allocator); data = allocHandle.allocate(allocator);
allocHandle.attemptedBytesRead(data.writableBytes()); allocHandle.attemptedBytesRead(data.writableBytes());
final DatagramSocketAddress remoteAddress; final DatagramSocketAddress remoteAddress;
epollInReadAttempted();
if (data.hasMemoryAddress()) { if (data.hasMemoryAddress()) {
// has a memory address so use optimized call // has a memory address so use optimized call
remoteAddress = fd().recvFromAddress(data.memoryAddress(), data.writerIndex(), remoteAddress = fd().recvFromAddress(data.memoryAddress(), data.writerIndex(),
@ -577,6 +576,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
int size = readBuf.size(); int size = readBuf.size();
for (int i = 0; i < size; i ++) { for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i)); pipeline.fireChannelRead(readBuf.get(i));
} }
readBuf.clear(); readBuf.clear();

View File

@ -148,24 +148,23 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
} }
private void epollInReadFd() { private void epollInReadFd() {
final ChannelConfig config = config(); if (fd().isInputShutdown()) {
if (!readPending && !config.isAutoRead() || fd().isInputShutdown()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0(); clearEpollIn0();
return; return;
} }
final ChannelConfig config = config();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET)); allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
allocHandle.reset(config); allocHandle.reset(config);
epollInBefore();
try { try {
readLoop: do { readLoop: do {
// lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the // lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
// EpollRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is // EpollRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is
// enabled. // enabled.
epollInReadAttempted();
allocHandle.lastBytesRead(Native.recvFd(fd().intValue())); allocHandle.lastBytesRead(Native.recvFd(fd().intValue()));
switch(allocHandle.lastBytesRead()) { switch(allocHandle.lastBytesRead()) {
case 0: case 0:
@ -175,6 +174,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
return; return;
default: default:
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(new FileDescriptor(allocHandle.lastBytesRead())); pipeline.fireChannelRead(new FileDescriptor(allocHandle.lastBytesRead()));
break; break;
} }

View File

@ -151,6 +151,19 @@ public class EpollSocketChannelTest {
} }
} }
@Test
public void testReadPendingIsResetAfterEachRead() throws InterruptedException {
EventLoopGroup group = new EpollEventLoopGroup();
try {
runReadPendingTest(group, EpollServerSocketChannel.class, EpollSocketChannel.class,
new InetSocketAddress(0));
runReadPendingTest(group, EpollServerDomainSocketChannel.class, EpollDomainSocketChannel.class,
EpollSocketTestPermutation.newSocketAddress());
} finally {
group.shutdownGracefully();
}
}
private void runAutoReadTest(boolean readOutsideEventLoopThread, EventLoopGroup group, private void runAutoReadTest(boolean readOutsideEventLoopThread, EventLoopGroup group,
Class<? extends ServerChannel> serverChannelClass, Class<? extends ServerChannel> serverChannelClass,
Class<? extends Channel> channelClass, SocketAddress bindAddr) Class<? extends Channel> channelClass, SocketAddress bindAddr)
@ -214,6 +227,62 @@ public class EpollSocketChannelTest {
} }
} }
private void runReadPendingTest(EventLoopGroup group,
Class<? extends ServerChannel> serverChannelClass,
Class<? extends Channel> channelClass, SocketAddress bindAddr)
throws InterruptedException {
Channel serverChannel = null;
Channel clientChannel = null;
try {
ReadPendingInitializer serverInitializer = new ReadPendingInitializer();
ReadPendingInitializer clientInitializer = new ReadPendingInitializer();
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024)
.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(ChannelOption.AUTO_READ, true)
.group(group)
.channel(serverChannelClass)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.childOption(ChannelOption.AUTO_READ, false)
// We intend to do 2 reads per read loop wakeup
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2))
.childHandler(serverInitializer);
serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel();
Bootstrap b = new Bootstrap()
.group(group)
.channel(channelClass)
.remoteAddress(serverChannel.localAddress())
.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(ChannelOption.AUTO_READ, false)
// We intend to do 2 reads per read loop wakeup
.option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2))
.handler(clientInitializer);
clientChannel = b.connect().syncUninterruptibly().channel();
// 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator
clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4]));
// 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator
assertTrue(serverInitializer.channelInitLatch.await(5, TimeUnit.SECONDS));
serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4]));
serverInitializer.channel.read();
serverInitializer.readPendingHandler.assertAllRead();
clientChannel.read();
clientInitializer.readPendingHandler.assertAllRead();
} finally {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
if (clientChannel != null) {
clientChannel.close().syncUninterruptibly();
}
}
}
private void runExceptionHandleFeedbackLoop(EventLoopGroup group, Class<? extends ServerChannel> serverChannelClass, private void runExceptionHandleFeedbackLoop(EventLoopGroup group, Class<? extends ServerChannel> serverChannelClass,
Class<? extends Channel> channelClass, SocketAddress bindAddr) throws InterruptedException { Class<? extends Channel> channelClass, SocketAddress bindAddr) throws InterruptedException {
Channel serverChannel = null; Channel serverChannel = null;
@ -254,6 +323,75 @@ public class EpollSocketChannelTest {
} }
} }
/**
* Designed to keep reading as long as autoread is enabled.
*/
private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
private final int numReads;
TestNumReadsRecvByteBufAllocator(int numReads) {
this.numReads = numReads;
}
@Override
public Handle newHandle() {
return new Handle() {
private ChannelConfig config;
private int attemptedBytesRead;
private int lastBytesRead;
private int numMessagesRead;
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
@Override
public int guess() {
return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
}
@Override
public void reset(ChannelConfig config) {
this.config = config;
numMessagesRead = 0;
}
@Override
public void incMessagesRead(int numMessages) {
numMessagesRead += numMessages;
}
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
}
@Override
public int lastBytesRead() {
return lastBytesRead;
}
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
@Override
public int attemptedBytesRead() {
return attemptedBytesRead;
}
@Override
public boolean continueReading() {
return numMessagesRead < numReads;
}
@Override
public void readComplete() {
}
};
}
}
/** /**
* Designed to keep reading as long as autoread is enabled. * Designed to keep reading as long as autoread is enabled.
*/ */
@ -330,6 +468,19 @@ public class EpollSocketChannelTest {
} }
} }
private static class ReadPendingInitializer extends ChannelInitializer<Channel> {
final ReadPendingReadHandler readPendingHandler = new ReadPendingReadHandler();
final CountDownLatch channelInitLatch = new CountDownLatch(1);
volatile Channel channel;
@Override
protected void initChannel(Channel ch) throws Exception {
channel = ch;
ch.pipeline().addLast(readPendingHandler);
channelInitLatch.countDown();
}
}
private static class MyInitializer extends ChannelInitializer<Channel> { private static class MyInitializer extends ChannelInitializer<Channel> {
final ExceptionHandler exceptionHandler = new ExceptionHandler(); final ExceptionHandler exceptionHandler = new ExceptionHandler();
@Override @Override
@ -349,6 +500,34 @@ public class EpollSocketChannelTest {
} }
} }
private static final class ReadPendingReadHandler extends ChannelInboundHandlerAdapter {
private final AtomicInteger count = new AtomicInteger();
private final CountDownLatch latch = new CountDownLatch(1);
private final CountDownLatch latch2 = new CountDownLatch(2);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ReferenceCountUtil.release(msg);
if (count.incrementAndGet() == 1) {
// Call read the first time, to ensure it is not reset the second time.
ctx.read();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
latch2.countDown();
}
void assertAllRead() throws InterruptedException {
assertTrue(latch.await(5, TimeUnit.SECONDS));
// We should only do 1 read loop, because we only called read() on the first channelRead.
assertFalse(latch2.await(1, TimeUnit.SECONDS));
assertEquals(2, count.get());
}
}
private static final class AutoReadHandler extends ChannelInboundHandlerAdapter { private static final class AutoReadHandler extends ChannelInboundHandlerAdapter {
private final AtomicInteger count = new AtomicInteger(); private final AtomicInteger count = new AtomicInteger();
private final CountDownLatch latch = new CountDownLatch(1); private final CountDownLatch latch = new CountDownLatch(1);

View File

@ -98,7 +98,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
removeReadOp(); removeReadOp();
return; return;
} }
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator(); final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
@ -107,7 +106,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
ByteBuf byteBuf = null; ByteBuf byteBuf = null;
boolean close = false; boolean close = false;
try { try {
boolean needReadPendingReset = true;
do { do {
byteBuf = allocHandle.allocate(allocator); byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf)); allocHandle.lastBytesRead(doReadBytes(byteBuf));
@ -120,10 +118,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} }
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
if (needReadPendingReset) { setReadPending(false);
needReadPendingReset = false;
setReadPending(false);
}
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
byteBuf = null; byteBuf = null;
} while (allocHandle.continueReading()); } while (allocHandle.continueReading());

View File

@ -59,7 +59,6 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
removeReadOp(); removeReadOp();
return; return;
} }
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config); allocHandle.reset(config);
@ -68,7 +67,6 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
Throwable exception = null; Throwable exception = null;
try { try {
try { try {
boolean needReadPendingReset = true;
do { do {
int localRead = doReadMessages(readBuf); int localRead = doReadMessages(readBuf);
if (localRead == 0) { if (localRead == 0) {
@ -80,10 +78,6 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
} }
allocHandle.incMessagesRead(localRead); allocHandle.incMessagesRead(localRead);
if (needReadPendingReset) {
needReadPendingReset = false;
setReadPending(false);
}
} while (allocHandle.continueReading()); } while (allocHandle.continueReading());
} catch (Throwable t) { } catch (Throwable t) {
exception = t; exception = t;
@ -91,6 +85,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
int size = readBuf.size(); int size = readBuf.size();
for (int i = 0; i < size; i ++) { for (int i = 0; i < size; i ++) {
setReadPending(false);
pipeline.fireChannelRead(readBuf.get(i)); pipeline.fireChannelRead(readBuf.get(i));
} }
readBuf.clear(); readBuf.clear();