From 4dc337a717f75dba6df0827079305dee83bd5d58 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 11 Feb 2014 18:08:40 +0100 Subject: [PATCH] Correctly respect isAutoRead() and make it consistent across OIO/NIO --- .../channel/nio/AbstractNioByteChannel.java | 13 ++- .../nio/AbstractNioMessageChannel.java | 91 ++++++++++--------- .../oio/AbstractOioMessageChannel.java | 20 +++- 3 files changed, 76 insertions(+), 48 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 481071cfc0..39211d176b 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -109,9 +109,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } - if (!config.isAutoRead()) { - removeReadOp(); - } ByteBuf byteBuf = null; int messages = 0; @@ -140,6 +137,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } totalReadAmount += localReadAmount; + + // stop reading + if (!config.isAutoRead()) { + break; + } + if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. @@ -156,6 +159,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); + } finally { + if (!config.isAutoRead()) { + removeReadOp(); + } } } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index b998658b1b..1fa46c547b 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -58,55 +58,62 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { @Override public void read() { assert eventLoop().inEventLoop(); - if (!config().isAutoRead()) { - removeReadOp(); - } - final ChannelConfig config = config(); - final int maxMessagesPerRead = config.getMaxMessagesPerRead(); - final boolean autoRead = config.isAutoRead(); - final ChannelPipeline pipeline = pipeline(); - boolean closed = false; - Throwable exception = null; + try { - for (;;) { - int localRead = doReadMessages(readBuf); - if (localRead == 0) { - break; - } - if (localRead < 0) { - closed = true; - break; - } + final int maxMessagesPerRead = config.getMaxMessagesPerRead(); + final ChannelPipeline pipeline = pipeline(); + boolean closed = false; + Throwable exception = null; + try { + for (;;) { + int localRead = doReadMessages(readBuf); + if (localRead == 0) { + break; + } + if (localRead < 0) { + closed = true; + break; + } - if (readBuf.size() >= maxMessagesPerRead | !autoRead) { - break; + // stop reading and remove op + if (!config.isAutoRead()) { + break; + } + + if (readBuf.size() >= maxMessagesPerRead) { + break; + } } - } - } catch (Throwable t) { - exception = t; - } - - int size = readBuf.size(); - for (int i = 0; i < size; i ++) { - pipeline.fireChannelRead(readBuf.get(i)); - } - readBuf.clear(); - pipeline.fireChannelReadComplete(); - - if (exception != null) { - if (exception instanceof IOException) { - // ServerChannel should not be closed even on IOException because it can often continue - // accepting incoming connections. (e.g. too many open files) - closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); + } catch (Throwable t) { + exception = t; } - pipeline.fireExceptionCaught(exception); - } + int size = readBuf.size(); + for (int i = 0; i < size; i ++) { + pipeline.fireChannelRead(readBuf.get(i)); + } + readBuf.clear(); + pipeline.fireChannelReadComplete(); - if (closed) { - if (isOpen()) { - close(voidPromise()); + if (exception != null) { + if (exception instanceof IOException) { + // ServerChannel should not be closed even on IOException because it can often continue + // accepting incoming connections. (e.g. too many open files) + closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); + } + + pipeline.fireExceptionCaught(exception); + } + + if (closed) { + if (isOpen()) { + close(voidPromise()); + } + } + } finally { + if (!config().isAutoRead()) { + removeReadOp(); } } } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java index 4898dff0a9..3e045d7486 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java @@ -16,6 +16,7 @@ package io.netty.channel.oio; import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoop; @@ -38,11 +39,24 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { protected void doRead() { final ChannelPipeline pipeline = pipeline(); boolean closed = false; + final ChannelConfig config = config(); + final int maxMessagesPerRead = config.getMaxMessagesPerRead(); + Throwable exception = null; try { - int localReadAmount = doReadMessages(readBuf); - if (localReadAmount < 0) { - closed = true; + for (;;) { + int localRead = doReadMessages(readBuf); + if (localRead == 0) { + break; + } + if (localRead < 0) { + closed = true; + break; + } + + if (readBuf.size() >= maxMessagesPerRead || !config.isAutoRead()) { + break; + } } } catch (Throwable t) { exception = t;