[#2254] Correctly handle Channel.read() and ChannelHandlerContext.read()

This includes also when it is called from channelRead(...) and channelReadComplete(...) methods.
This commit is contained in:
Norman Maurer 2014-02-21 21:42:13 +01:00
parent 47fab2bfe8
commit b32316b33c
6 changed files with 139 additions and 113 deletions

View File

@ -129,6 +129,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
protected abstract AbstractEpollUnsafe newUnsafe();
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
protected boolean readPending;
/**
* Called once EPOLLIN event is ready to be processed
@ -142,6 +143,13 @@ abstract class AbstractEpollChannel extends AbstractChannel {
// NOOP
}
@Override
public void beginRead() {
// Channel.read() or ChannelHandlerContext.read() was called
readPending = true;
super.beginRead();
}
@Override
protected void flush0() {
// Flush immediately only when there's no pending flush.

View File

@ -79,6 +79,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
}
final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {
@Override
public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
// Connect not supported by ServerChannel implementations
@ -91,33 +92,40 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
final ChannelPipeline pipeline = pipeline();
Throwable exception = null;
try {
for (;;) {
int socketFd = Native.accept(fd);
if (socketFd == -1) {
// this means everything was handled for now
break;
}
try {
pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
try {
for (;;) {
int socketFd = Native.accept(fd);
if (socketFd == -1) {
// this means everything was handled for now
break;
}
try {
readPending = false;
pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
}
}
} catch (Throwable t) {
exception = t;
}
} catch (Throwable t) {
exception = t;
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config().isAutoRead()) {
clearEpollIn();
}
pipeline.fireChannelReadComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
if (exception != null) {
pipeline.fireExceptionCaught(exception);
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (config.isAutoRead() && !readPending) {
clearEpollIn();
}
}
}
}

View File

@ -373,6 +373,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
@ -602,6 +603,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
close = localReadAmount < 0;
break;
}
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
@ -620,13 +622,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break;
}
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
clearEpollIn();
}
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
@ -646,6 +641,16 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
});
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (config.isAutoRead() && !readPending) {
clearEpollIn();
}
}
}
}

View File

@ -55,21 +55,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private final class NioByteUnsafe extends AbstractNioUnsafe {
private RecvByteBufAllocator.Handle allocHandle;
private void removeReadOp() {
SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp);
}
}
private void closeOnRead(ChannelPipeline pipeline) {
SelectionKey key = selectionKey();
setInputShutdown();
@ -83,19 +68,15 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
}
}
private void handleReadException(ChannelPipeline pipeline, ChannelConfig config,
private void handleReadException(ChannelPipeline pipeline,
ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
if (!config.isAutoRead()) {
removeReadOp();
}
}
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
@ -131,7 +112,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
close = localReadAmount < 0;
break;
}
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
@ -154,12 +135,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
break;
}
} while (++ messages < maxMessagesPerRead);
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
removeReadOp();
}
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
@ -169,7 +144,17 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, config, byteBuf, t, close);
handleReadException(pipeline, byteBuf, t, close);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (config.isAutoRead() && !readPending) {
removeReadOp();
}
}
}
}

View File

@ -148,6 +148,30 @@ public abstract class AbstractNioChannel extends AbstractChannel {
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
protected boolean readPending;
protected final void removeReadOp() {
SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp);
}
}
@Override
public void beginRead() {
// Channel.read() or ChannelHandlerContext.read() was called
readPending = true;
super.beginRead();
}
@Override
public SelectableChannel ch() {
return javaChannel();

View File

@ -48,15 +48,6 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
private final List<Object> readBuf = new ArrayList<Object>();
private void removeReadOp() {
SelectionKey key = selectionKey();
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp);
}
}
@Override
public void read() {
assert eventLoop().inEventLoop();
@ -66,58 +57,63 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// stop reading and remove op
if (!config.isAutoRead()) {
break;
}
// stop reading and remove op
if (!config.isAutoRead()) {
break;
}
if (readBuf.size() >= maxMessagesPerRead) {
break;
if (readBuf.size() >= maxMessagesPerRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
removeReadOp();
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
readPending = false;
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
pipeline.fireExceptionCaught(exception);
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (closed) {
if (isOpen()) {
close(voidPromise());
if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}
pipeline.fireExceptionCaught(exception);
}
if (closed) {
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (config.isAutoRead() && !readPending) {
removeReadOp();
}
}
}