diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index f22aca6df7..3ac9f72fb7 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -108,9 +108,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } - if (!config.isAutoRead()) { - removeReadOp(); - } ByteBuf byteBuf = null; int messages = 0; @@ -139,6 +136,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } totalReadAmount += localReadAmount; + + // stop reading + if (!config.isAutoRead()) { + break; + } + if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. @@ -155,6 +158,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); + } finally { + if (!config.isAutoRead()) { + removeReadOp(); + } } } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index 9a09956663..15a55f63b7 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -59,55 +59,62 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { @Override public void read() { assert eventLoop().inEventLoop(); - if (!config().isAutoRead()) { - removeReadOp(); - } - final ChannelConfig config = config(); - final int maxMessagesPerRead = config.getMaxMessagesPerRead(); - final boolean autoRead = config.isAutoRead(); - final ChannelPipeline pipeline = pipeline(); - boolean closed = false; - Throwable exception = null; + try { - for (;;) { - int localRead = doReadMessages(readBuf); - if (localRead == 0) { - break; - } - if (localRead < 0) { - closed = true; - break; - } + final int maxMessagesPerRead = config.getMaxMessagesPerRead(); + final ChannelPipeline pipeline = pipeline(); + boolean closed = false; + Throwable exception = null; + try { + for (;;) { + int localRead = doReadMessages(readBuf); + if (localRead == 0) { + break; + } + if (localRead < 0) { + closed = true; + break; + } - if (readBuf.size() >= maxMessagesPerRead | !autoRead) { - break; + // stop reading and remove op + if (!config.isAutoRead()) { + break; + } + + if (readBuf.size() >= maxMessagesPerRead) { + break; + } } - } - } catch (Throwable t) { - exception = t; - } - - int size = readBuf.size(); - for (int i = 0; i < size; i ++) { - pipeline.fireChannelRead(readBuf.get(i)); - } - readBuf.clear(); - pipeline.fireChannelReadComplete(); - - if (exception != null) { - if (exception instanceof IOException) { - // ServerChannel should not be closed even on IOException because it can often continue - // accepting incoming connections. (e.g. too many open files) - closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); + } catch (Throwable t) { + exception = t; } - pipeline.fireExceptionCaught(exception); - } + int size = readBuf.size(); + for (int i = 0; i < size; i ++) { + pipeline.fireChannelRead(readBuf.get(i)); + } + readBuf.clear(); + pipeline.fireChannelReadComplete(); - if (closed) { - if (isOpen()) { - close(voidPromise()); + if (exception != null) { + if (exception instanceof IOException) { + // ServerChannel should not be closed even on IOException because it can often continue + // accepting incoming connections. (e.g. too many open files) + closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); + } + + pipeline.fireExceptionCaught(exception); + } + + if (closed) { + if (isOpen()) { + close(voidPromise()); + } + } + } finally { + if (!config().isAutoRead()) { + removeReadOp(); } } } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java index 7fcd1045f5..f981698268 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java @@ -16,6 +16,7 @@ package io.netty.channel.oio; import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelPipeline; import java.io.IOException; @@ -37,11 +38,24 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { protected void doRead() { final ChannelPipeline pipeline = pipeline(); boolean closed = false; + final ChannelConfig config = config(); + final int maxMessagesPerRead = config.getMaxMessagesPerRead(); + Throwable exception = null; try { - int localReadAmount = doReadMessages(readBuf); - if (localReadAmount < 0) { - closed = true; + for (;;) { + int localRead = doReadMessages(readBuf); + if (localRead == 0) { + break; + } + if (localRead < 0) { + closed = true; + break; + } + + if (readBuf.size() >= maxMessagesPerRead || !config.isAutoRead()) { + break; + } } } catch (Throwable t) { exception = t;