Correctly respect isAutoRead() and make it consistent across OIO/NIO

This commit is contained in:
Norman Maurer 2014-02-11 18:08:40 +01:00
parent 7041a9238e
commit 80e6f9adf4
3 changed files with 76 additions and 48 deletions

View File

@ -108,9 +108,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
if (allocHandle == null) { if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
} }
if (!config.isAutoRead()) {
removeReadOp();
}
ByteBuf byteBuf = null; ByteBuf byteBuf = null;
int messages = 0; int messages = 0;
@ -139,6 +136,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} }
totalReadAmount += localReadAmount; totalReadAmount += localReadAmount;
// stop reading
if (!config.isAutoRead()) {
break;
}
if (localReadAmount < writable) { if (localReadAmount < writable) {
// Read less than what the buffer can hold, // Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely. // which might mean we drained the recv buffer completely.
@ -155,6 +158,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} }
} catch (Throwable t) { } catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close); handleReadException(pipeline, byteBuf, t, close);
} finally {
if (!config.isAutoRead()) {
removeReadOp();
}
} }
} }
} }

View File

@ -59,55 +59,62 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
@Override @Override
public void read() { public void read() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
if (!config().isAutoRead()) {
removeReadOp();
}
final ChannelConfig config = config(); final ChannelConfig config = config();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final boolean autoRead = config.isAutoRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try { try {
for (;;) { final int maxMessagesPerRead = config.getMaxMessagesPerRead();
int localRead = doReadMessages(readBuf); final ChannelPipeline pipeline = pipeline();
if (localRead == 0) { boolean closed = false;
break; Throwable exception = null;
} try {
if (localRead < 0) { for (;;) {
closed = true; int localRead = doReadMessages(readBuf);
break; if (localRead == 0) {
} break;
}
if (localRead < 0) {
closed = true;
break;
}
if (readBuf.size() >= maxMessagesPerRead | !autoRead) { // stop reading and remove op
break; if (!config.isAutoRead()) {
break;
}
if (readBuf.size() >= maxMessagesPerRead) {
break;
}
} }
} } catch (Throwable t) {
} catch (Throwable t) { exception = t;
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
} }
pipeline.fireExceptionCaught(exception); int size = readBuf.size();
} for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (closed) { if (exception != null) {
if (isOpen()) { if (exception instanceof IOException) {
close(voidPromise()); // ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}
pipeline.fireExceptionCaught(exception);
}
if (closed) {
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!config().isAutoRead()) {
removeReadOp();
} }
} }
} }

View File

@ -16,6 +16,7 @@
package io.netty.channel.oio; package io.netty.channel.oio;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import java.io.IOException; import java.io.IOException;
@ -37,11 +38,24 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
protected void doRead() { protected void doRead() {
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
boolean closed = false; boolean closed = false;
final ChannelConfig config = config();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
Throwable exception = null; Throwable exception = null;
try { try {
int localReadAmount = doReadMessages(readBuf); for (;;) {
if (localReadAmount < 0) { int localRead = doReadMessages(readBuf);
closed = true; if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
if (readBuf.size() >= maxMessagesPerRead || !config.isAutoRead()) {
break;
}
} }
} catch (Throwable t) { } catch (Throwable t) {
exception = t; exception = t;