[#2254] Correctly handle Channel.read() and ChannelHandlerContext.read()

This includes also when it is called from channelRead(...) and channelReadComplete(...) methods.
This commit is contained in:
Norman Maurer 2014-02-21 21:42:13 +01:00
parent 60b830ba89
commit 52535a12f8
6 changed files with 140 additions and 115 deletions

View File

@ -129,6 +129,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
protected abstract AbstractEpollUnsafe newUnsafe(); protected abstract AbstractEpollUnsafe newUnsafe();
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
protected boolean readPending;
/** /**
* Called once EPOLLIN event is ready to be processed * Called once EPOLLIN event is ready to be processed
@ -142,6 +143,13 @@ abstract class AbstractEpollChannel extends AbstractChannel {
// NOOP // NOOP
} }
@Override
public void beginRead() {
// Channel.read() or ChannelHandlerContext.read() was called
readPending = true;
super.beginRead();
}
@Override @Override
protected void flush0() { protected void flush0() {
// Flush immediately only when there's no pending flush. // Flush immediately only when there's no pending flush.

View File

@ -87,6 +87,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
} }
final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {
@Override @Override
public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) { public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
// Connect not supported by ServerChannel implementations // Connect not supported by ServerChannel implementations
@ -99,35 +100,41 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
Throwable exception = null; Throwable exception = null;
try { try {
for (;;) { try {
int socketFd = Native.accept(fd); for (;;) {
if (socketFd == -1) { int socketFd = Native.accept(fd);
// this means everything was handled for now if (socketFd == -1) {
break; // this means everything was handled for now
} break;
try { }
pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, try {
childEventLoopGroup().next(), socketFd)); readPending = false;
} catch (Throwable t) { pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this,
// keep on reading as we use epoll ET and need to consume everything from the socket childEventLoopGroup().next(), socketFd));
pipeline.fireChannelReadComplete(); } catch (Throwable t) {
pipeline.fireExceptionCaught(t); // keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
}
} }
} catch (Throwable t) {
exception = t;
} }
pipeline.fireChannelReadComplete();
} catch (Throwable t) { if (exception != null) {
exception = t; pipeline.fireExceptionCaught(exception);
} }
// This must be triggered before the channelReadComplete() to give the user the chance } finally {
// to call Channel.read() again. // Check if there is a readPending which was not processed yet.
// See https://github.com/netty/netty/issues/2254 // This could be for two reasons:
if (!config().isAutoRead()) { // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
clearEpollIn(); // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
} //
pipeline.fireChannelReadComplete(); // See https://github.com/netty/netty/issues/2254
if (config.isAutoRead() && !readPending) {
if (exception != null) { clearEpollIn();
pipeline.fireExceptionCaught(exception); }
} }
} }
} }

View File

@ -373,6 +373,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) { if (byteBuf != null) {
if (byteBuf.isReadable()) { if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
} else { } else {
byteBuf.release(); byteBuf.release();
@ -602,6 +603,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
close = localReadAmount < 0; close = localReadAmount < 0;
break; break;
} }
readPending = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
byteBuf = null; byteBuf = null;
@ -620,13 +622,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break; break;
} }
} }
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
clearEpollIn();
}
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount); allocHandle.record(totalReadAmount);
@ -646,6 +641,16 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
} }
}); });
} }
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (config.isAutoRead() && !readPending) {
clearEpollIn();
}
} }
} }
} }

View File

@ -56,21 +56,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private final class NioByteUnsafe extends AbstractNioUnsafe { private final class NioByteUnsafe extends AbstractNioUnsafe {
private RecvByteBufAllocator.Handle allocHandle; private RecvByteBufAllocator.Handle allocHandle;
private void removeReadOp() {
SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp);
}
}
private void closeOnRead(ChannelPipeline pipeline) { private void closeOnRead(ChannelPipeline pipeline) {
SelectionKey key = selectionKey(); SelectionKey key = selectionKey();
setInputShutdown(); setInputShutdown();
@ -84,19 +69,15 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} }
} }
private void handleReadException(ChannelPipeline pipeline, ChannelConfig config, private void handleReadException(ChannelPipeline pipeline,
ByteBuf byteBuf, Throwable cause, boolean close) { ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) { if (byteBuf != null) {
if (byteBuf.isReadable()) { if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
} else { } else {
byteBuf.release(); byteBuf.release();
} }
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
if (!config.isAutoRead()) {
removeReadOp();
}
} }
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause); pipeline.fireExceptionCaught(cause);
@ -132,7 +113,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
close = localReadAmount < 0; close = localReadAmount < 0;
break; break;
} }
readPending = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
byteBuf = null; byteBuf = null;
@ -155,12 +136,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
break; break;
} }
} while (++ messages < maxMessagesPerRead); } while (++ messages < maxMessagesPerRead);
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
removeReadOp();
}
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount); allocHandle.record(totalReadAmount);
@ -170,7 +145,17 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
close = false; close = false;
} }
} catch (Throwable t) { } catch (Throwable t) {
handleReadException(pipeline, config, byteBuf, t, close); handleReadException(pipeline, byteBuf, t, close);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (config.isAutoRead() && !readPending) {
removeReadOp();
}
} }
} }
} }

View File

@ -148,6 +148,30 @@ public abstract class AbstractNioChannel extends AbstractChannel {
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
protected boolean readPending;
protected final void removeReadOp() {
SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp);
}
}
@Override
public void beginRead() {
// Channel.read() or ChannelHandlerContext.read() was called
readPending = true;
super.beginRead();
}
@Override @Override
public SelectableChannel ch() { public SelectableChannel ch() {
return javaChannel(); return javaChannel();

View File

@ -47,15 +47,6 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
private final List<Object> readBuf = new ArrayList<Object>(); private final List<Object> readBuf = new ArrayList<Object>();
private void removeReadOp() {
SelectionKey key = selectionKey();
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp);
}
}
@Override @Override
public void read() { public void read() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
@ -65,58 +56,63 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
boolean closed = false; boolean closed = false;
Throwable exception = null; Throwable exception = null;
try { try {
for (;;) { try {
int localRead = doReadMessages(readBuf); for (;;) {
if (localRead == 0) { int localRead = doReadMessages(readBuf);
break; if (localRead == 0) {
} break;
if (localRead < 0) { }
closed = true; if (localRead < 0) {
break; closed = true;
} break;
}
// stop reading and remove op // stop reading and remove op
if (!config.isAutoRead()) { if (!config.isAutoRead()) {
break; break;
} }
if (readBuf.size() >= maxMessagesPerRead) { if (readBuf.size() >= maxMessagesPerRead) {
break; break;
}
} }
} catch (Throwable t) {
exception = t;
} }
} catch (Throwable t) { readPending = false;
exception = t; int size = readBuf.size();
} for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
removeReadOp();
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
} }
pipeline.fireExceptionCaught(exception); readBuf.clear();
} pipeline.fireChannelReadComplete();
if (closed) { if (exception != null) {
if (isOpen()) { if (exception instanceof IOException) {
close(voidPromise()); // ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}
pipeline.fireExceptionCaught(exception);
}
if (closed) {
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (config.isAutoRead() && !readPending) {
removeReadOp();
} }
} }
} }