diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 274f248b15..55b67f0686 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -129,6 +129,7 @@ abstract class AbstractEpollChannel extends AbstractChannel { protected abstract AbstractEpollUnsafe newUnsafe(); protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { + protected boolean readPending; /** * Called once EPOLLIN event is ready to be processed @@ -142,6 +143,13 @@ abstract class AbstractEpollChannel extends AbstractChannel { // NOOP } + @Override + public void beginRead() { + // Channel.read() or ChannelHandlerContext.read() was called + readPending = true; + super.beginRead(); + } + @Override protected void flush0() { // Flush immediately only when there's no pending flush. diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java index a1a8bddc33..2f42d4eeda 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -87,6 +87,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme } final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { + @Override public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) { // Connect not supported by ServerChannel implementations @@ -99,35 +100,41 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme final ChannelPipeline pipeline = pipeline(); Throwable exception = null; try { - for (;;) { - int socketFd = Native.accept(fd); - if (socketFd == -1) { - // this means everything was handled for now - break; - } - try { - pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, - childEventLoopGroup().next(), socketFd)); - } catch (Throwable t) { - // keep on reading as we use epoll ET and need to consume everything from the socket - pipeline.fireChannelReadComplete(); - pipeline.fireExceptionCaught(t); + try { + for (;;) { + int socketFd = Native.accept(fd); + if (socketFd == -1) { + // this means everything was handled for now + break; + } + try { + readPending = false; + pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, + childEventLoopGroup().next(), socketFd)); + } catch (Throwable t) { + // keep on reading as we use epoll ET and need to consume everything from the socket + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(t); + } } + } catch (Throwable t) { + exception = t; } + pipeline.fireChannelReadComplete(); - } catch (Throwable t) { - exception = t; - } - // This must be triggered before the channelReadComplete() to give the user the chance - // to call Channel.read() again. - // See https://github.com/netty/netty/issues/2254 - if (!config().isAutoRead()) { - clearEpollIn(); - } - pipeline.fireChannelReadComplete(); - - if (exception != null) { - pipeline.fireExceptionCaught(exception); + if (exception != null) { + pipeline.fireExceptionCaught(exception); + } + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (config.isAutoRead() && !readPending) { + clearEpollIn(); + } } } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 4cecbec50f..499c8e3102 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -373,6 +373,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { if (byteBuf != null) { if (byteBuf.isReadable()) { + readPending = false; pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); @@ -602,6 +603,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So close = localReadAmount < 0; break; } + readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; @@ -620,13 +622,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So break; } } - // This must be triggered before the channelReadComplete() to give the user the chance - // to call Channel.read() again. - // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead()) { - clearEpollIn(); - } - pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); @@ -646,6 +641,16 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } }); } + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (config.isAutoRead() && !readPending) { + clearEpollIn(); + } } } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index a385aae72a..1b43af6bc6 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -56,21 +56,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { private final class NioByteUnsafe extends AbstractNioUnsafe { private RecvByteBufAllocator.Handle allocHandle; - private void removeReadOp() { - SelectionKey key = selectionKey(); - // Check first if the key is still valid as it may be canceled as part of the deregistration - // from the EventLoop - // See https://github.com/netty/netty/issues/2104 - if (!key.isValid()) { - return; - } - int interestOps = key.interestOps(); - if ((interestOps & readInterestOp) != 0) { - // only remove readInterestOp if needed - key.interestOps(interestOps & ~readInterestOp); - } - } - private void closeOnRead(ChannelPipeline pipeline) { SelectionKey key = selectionKey(); setInputShutdown(); @@ -84,19 +69,15 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } } - private void handleReadException(ChannelPipeline pipeline, ChannelConfig config, + private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { if (byteBuf != null) { if (byteBuf.isReadable()) { + readPending = false; pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); } - // This must be triggered before the channelReadComplete() to give the user the chance - // to call Channel.read() again. - if (!config.isAutoRead()) { - removeReadOp(); - } } pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(cause); @@ -132,7 +113,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { close = localReadAmount < 0; break; } - + readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; @@ -155,12 +136,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { break; } } while (++ messages < maxMessagesPerRead); - // This must be triggered before the channelReadComplete() to give the user the chance - // to call Channel.read() again. - // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead()) { - removeReadOp(); - } pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); @@ -170,7 +145,17 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { close = false; } } catch (Throwable t) { - handleReadException(pipeline, config, byteBuf, t, close); + handleReadException(pipeline, byteBuf, t, close); + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (config.isAutoRead() && !readPending) { + removeReadOp(); + } } } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 5c59ed6cea..2d865535d0 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -148,6 +148,30 @@ public abstract class AbstractNioChannel extends AbstractChannel { protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { + protected boolean readPending; + + protected final void removeReadOp() { + SelectionKey key = selectionKey(); + // Check first if the key is still valid as it may be canceled as part of the deregistration + // from the EventLoop + // See https://github.com/netty/netty/issues/2104 + if (!key.isValid()) { + return; + } + int interestOps = key.interestOps(); + if ((interestOps & readInterestOp) != 0) { + // only remove readInterestOp if needed + key.interestOps(interestOps & ~readInterestOp); + } + } + + @Override + public void beginRead() { + // Channel.read() or ChannelHandlerContext.read() was called + readPending = true; + super.beginRead(); + } + @Override public SelectableChannel ch() { return javaChannel(); diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index e689df7d05..c5443cdd4e 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -47,15 +47,6 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { private final List readBuf = new ArrayList(); - private void removeReadOp() { - SelectionKey key = selectionKey(); - int interestOps = key.interestOps(); - if ((interestOps & readInterestOp) != 0) { - // only remove readInterestOp if needed - key.interestOps(interestOps & ~readInterestOp); - } - } - @Override public void read() { assert eventLoop().inEventLoop(); @@ -65,58 +56,63 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { final ChannelPipeline pipeline = pipeline(); boolean closed = false; Throwable exception = null; - try { - for (;;) { - int localRead = doReadMessages(readBuf); - if (localRead == 0) { - break; - } - if (localRead < 0) { - closed = true; - break; - } + try { + for (;;) { + int localRead = doReadMessages(readBuf); + if (localRead == 0) { + break; + } + if (localRead < 0) { + closed = true; + break; + } - // stop reading and remove op - if (!config.isAutoRead()) { - break; - } + // stop reading and remove op + if (!config.isAutoRead()) { + break; + } - if (readBuf.size() >= maxMessagesPerRead) { - break; + if (readBuf.size() >= maxMessagesPerRead) { + break; + } } + } catch (Throwable t) { + exception = t; } - } catch (Throwable t) { - exception = t; - } - - int size = readBuf.size(); - for (int i = 0; i < size; i ++) { - pipeline.fireChannelRead(readBuf.get(i)); - } - // This must be triggered before the channelReadComplete() to give the user the chance - // to call Channel.read() again. - // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead()) { - removeReadOp(); - } - - readBuf.clear(); - pipeline.fireChannelReadComplete(); - - if (exception != null) { - if (exception instanceof IOException) { - // ServerChannel should not be closed even on IOException because it can often continue - // accepting incoming connections. (e.g. too many open files) - closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); + readPending = false; + int size = readBuf.size(); + for (int i = 0; i < size; i ++) { + pipeline.fireChannelRead(readBuf.get(i)); } - pipeline.fireExceptionCaught(exception); - } + readBuf.clear(); + pipeline.fireChannelReadComplete(); - if (closed) { - if (isOpen()) { - close(voidPromise()); + if (exception != null) { + if (exception instanceof IOException) { + // ServerChannel should not be closed even on IOException because it can often continue + // accepting incoming connections. (e.g. too many open files) + closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); + } + + pipeline.fireExceptionCaught(exception); + } + + if (closed) { + if (isOpen()) { + close(voidPromise()); + } + } + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (config.isAutoRead() && !readPending) { + removeReadOp(); } } }