From b32316b33cad851a8e22b5bd4cbe1de2485f1416 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 21 Feb 2014 21:42:13 +0100 Subject: [PATCH] [#2254] Correctly handle Channel.read() and ChannelHandlerContext.read() This includes also when it is called from channelRead(...) and channelReadComplete(...) methods. --- .../channel/epoll/AbstractEpollChannel.java | 8 ++ .../epoll/EpollServerSocketChannel.java | 56 +++++----- .../channel/epoll/EpollSocketChannel.java | 19 ++-- .../channel/nio/AbstractNioByteChannel.java | 43 +++----- .../netty/channel/nio/AbstractNioChannel.java | 24 +++++ .../nio/AbstractNioMessageChannel.java | 102 +++++++++--------- 6 files changed, 139 insertions(+), 113 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index e2078d3880..d332986b6b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -129,6 +129,7 @@ abstract class AbstractEpollChannel extends AbstractChannel { protected abstract AbstractEpollUnsafe newUnsafe(); protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { + protected boolean readPending; /** * Called once EPOLLIN event is ready to be processed @@ -142,6 +143,13 @@ abstract class AbstractEpollChannel extends AbstractChannel { // NOOP } + @Override + public void beginRead() { + // Channel.read() or ChannelHandlerContext.read() was called + readPending = true; + super.beginRead(); + } + @Override protected void flush0() { // Flush immediately only when there's no pending flush. diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java index 935c3244eb..4a9cebd38d 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -79,6 +79,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme } final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { + @Override public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) { // Connect not supported by ServerChannel implementations @@ -91,33 +92,40 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme final ChannelPipeline pipeline = pipeline(); Throwable exception = null; try { - for (;;) { - int socketFd = Native.accept(fd); - if (socketFd == -1) { - // this means everything was handled for now - break; - } - try { - pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd)); - } catch (Throwable t) { - // keep on reading as we use epoll ET and need to consume everything from the socket - pipeline.fireChannelReadComplete(); - pipeline.fireExceptionCaught(t); + try { + for (;;) { + int socketFd = Native.accept(fd); + if (socketFd == -1) { + // this means everything was handled for now + break; + } + try { + readPending = false; + pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd)); + } catch (Throwable t) { + // keep on reading as we use epoll ET and need to consume everything from the socket + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(t); + } } + } catch (Throwable t) { + exception = t; } - } catch (Throwable t) { - exception = t; - } - // This must be triggered before the channelReadComplete() to give the user the chance - // to call Channel.read() again. - // See https://github.com/netty/netty/issues/2254 - if (!config().isAutoRead()) { - clearEpollIn(); - } - pipeline.fireChannelReadComplete(); + pipeline.fireChannelReadComplete(); - if (exception != null) { - pipeline.fireExceptionCaught(exception); + if (exception != null) { + pipeline.fireExceptionCaught(exception); + } + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (config.isAutoRead() && !readPending) { + clearEpollIn(); + } } } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 988b09bff1..edb12876dd 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -373,6 +373,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { if (byteBuf != null) { if (byteBuf.isReadable()) { + readPending = false; pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); @@ -602,6 +603,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So close = localReadAmount < 0; break; } + readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; @@ -620,13 +622,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So break; } } - // This must be triggered before the channelReadComplete() to give the user the chance - // to call Channel.read() again. - // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead()) { - clearEpollIn(); - } - pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); @@ -646,6 +641,16 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } }); } + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (config.isAutoRead() && !readPending) { + clearEpollIn(); + } } } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 1bdec26edb..8815a23b90 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -55,21 +55,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { private final class NioByteUnsafe extends AbstractNioUnsafe { private RecvByteBufAllocator.Handle allocHandle; - private void removeReadOp() { - SelectionKey key = selectionKey(); - // Check first if the key is still valid as it may be canceled as part of the deregistration - // from the EventLoop - // See https://github.com/netty/netty/issues/2104 - if (!key.isValid()) { - return; - } - int interestOps = key.interestOps(); - if ((interestOps & readInterestOp) != 0) { - // only remove readInterestOp if needed - key.interestOps(interestOps & ~readInterestOp); - } - } - private void closeOnRead(ChannelPipeline pipeline) { SelectionKey key = selectionKey(); setInputShutdown(); @@ -83,19 +68,15 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } } - private void handleReadException(ChannelPipeline pipeline, ChannelConfig config, + private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { if (byteBuf != null) { if (byteBuf.isReadable()) { + readPending = false; pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); } - // This must be triggered before the channelReadComplete() to give the user the chance - // to call Channel.read() again. - if (!config.isAutoRead()) { - removeReadOp(); - } } pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(cause); @@ -131,7 +112,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { close = localReadAmount < 0; break; } - + readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; @@ -154,12 +135,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { break; } } while (++ messages < maxMessagesPerRead); - // This must be triggered before the channelReadComplete() to give the user the chance - // to call Channel.read() again. - // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead()) { - removeReadOp(); - } pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); @@ -169,7 +144,17 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { close = false; } } catch (Throwable t) { - handleReadException(pipeline, config, byteBuf, t, close); + handleReadException(pipeline, byteBuf, t, close); + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (config.isAutoRead() && !readPending) { + removeReadOp(); + } } } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 09a0d7262f..62961c11d4 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -148,6 +148,30 @@ public abstract class AbstractNioChannel extends AbstractChannel { protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { + protected boolean readPending; + + protected final void removeReadOp() { + SelectionKey key = selectionKey(); + // Check first if the key is still valid as it may be canceled as part of the deregistration + // from the EventLoop + // See https://github.com/netty/netty/issues/2104 + if (!key.isValid()) { + return; + } + int interestOps = key.interestOps(); + if ((interestOps & readInterestOp) != 0) { + // only remove readInterestOp if needed + key.interestOps(interestOps & ~readInterestOp); + } + } + + @Override + public void beginRead() { + // Channel.read() or ChannelHandlerContext.read() was called + readPending = true; + super.beginRead(); + } + @Override public SelectableChannel ch() { return javaChannel(); diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index f3773feea1..d96200de50 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -48,15 +48,6 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { private final List readBuf = new ArrayList(); - private void removeReadOp() { - SelectionKey key = selectionKey(); - int interestOps = key.interestOps(); - if ((interestOps & readInterestOp) != 0) { - // only remove readInterestOp if needed - key.interestOps(interestOps & ~readInterestOp); - } - } - @Override public void read() { assert eventLoop().inEventLoop(); @@ -66,58 +57,63 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { final ChannelPipeline pipeline = pipeline(); boolean closed = false; Throwable exception = null; - try { - for (;;) { - int localRead = doReadMessages(readBuf); - if (localRead == 0) { - break; - } - if (localRead < 0) { - closed = true; - break; - } + try { + for (;;) { + int localRead = doReadMessages(readBuf); + if (localRead == 0) { + break; + } + if (localRead < 0) { + closed = true; + break; + } - // stop reading and remove op - if (!config.isAutoRead()) { - break; - } + // stop reading and remove op + if (!config.isAutoRead()) { + break; + } - if (readBuf.size() >= maxMessagesPerRead) { - break; + if (readBuf.size() >= maxMessagesPerRead) { + break; + } } + } catch (Throwable t) { + exception = t; } - } catch (Throwable t) { - exception = t; - } - - int size = readBuf.size(); - for (int i = 0; i < size; i ++) { - pipeline.fireChannelRead(readBuf.get(i)); - } - // This must be triggered before the channelReadComplete() to give the user the chance - // to call Channel.read() again. - // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead()) { - removeReadOp(); - } - - readBuf.clear(); - pipeline.fireChannelReadComplete(); - - if (exception != null) { - if (exception instanceof IOException) { - // ServerChannel should not be closed even on IOException because it can often continue - // accepting incoming connections. (e.g. too many open files) - closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); + readPending = false; + int size = readBuf.size(); + for (int i = 0; i < size; i ++) { + pipeline.fireChannelRead(readBuf.get(i)); } - pipeline.fireExceptionCaught(exception); - } + readBuf.clear(); + pipeline.fireChannelReadComplete(); - if (closed) { - if (isOpen()) { - close(voidPromise()); + if (exception != null) { + if (exception instanceof IOException) { + // ServerChannel should not be closed even on IOException because it can often continue + // accepting incoming connections. (e.g. too many open files) + closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); + } + + pipeline.fireExceptionCaught(exception); + } + + if (closed) { + if (isOpen()) { + close(voidPromise()); + } + } + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (config.isAutoRead() && !readPending) { + removeReadOp(); } } }