Correctly respect isAutoRead() and make it consistent across OIO/NIO
This commit is contained in:
parent
a96eb96646
commit
4dc337a717
@ -109,9 +109,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
if (allocHandle == null) {
|
||||
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
|
||||
}
|
||||
if (!config.isAutoRead()) {
|
||||
removeReadOp();
|
||||
}
|
||||
|
||||
ByteBuf byteBuf = null;
|
||||
int messages = 0;
|
||||
@ -140,6 +137,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
}
|
||||
|
||||
totalReadAmount += localReadAmount;
|
||||
|
||||
// stop reading
|
||||
if (!config.isAutoRead()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (localReadAmount < writable) {
|
||||
// Read less than what the buffer can hold,
|
||||
// which might mean we drained the recv buffer completely.
|
||||
@ -156,6 +159,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
handleReadException(pipeline, byteBuf, t, close);
|
||||
} finally {
|
||||
if (!config.isAutoRead()) {
|
||||
removeReadOp();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -58,55 +58,62 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
|
||||
@Override
|
||||
public void read() {
|
||||
assert eventLoop().inEventLoop();
|
||||
if (!config().isAutoRead()) {
|
||||
removeReadOp();
|
||||
}
|
||||
|
||||
final ChannelConfig config = config();
|
||||
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
|
||||
final boolean autoRead = config.isAutoRead();
|
||||
final ChannelPipeline pipeline = pipeline();
|
||||
boolean closed = false;
|
||||
Throwable exception = null;
|
||||
|
||||
try {
|
||||
for (;;) {
|
||||
int localRead = doReadMessages(readBuf);
|
||||
if (localRead == 0) {
|
||||
break;
|
||||
}
|
||||
if (localRead < 0) {
|
||||
closed = true;
|
||||
break;
|
||||
}
|
||||
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
|
||||
final ChannelPipeline pipeline = pipeline();
|
||||
boolean closed = false;
|
||||
Throwable exception = null;
|
||||
try {
|
||||
for (;;) {
|
||||
int localRead = doReadMessages(readBuf);
|
||||
if (localRead == 0) {
|
||||
break;
|
||||
}
|
||||
if (localRead < 0) {
|
||||
closed = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (readBuf.size() >= maxMessagesPerRead | !autoRead) {
|
||||
break;
|
||||
// stop reading and remove op
|
||||
if (!config.isAutoRead()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (readBuf.size() >= maxMessagesPerRead) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
exception = t;
|
||||
}
|
||||
|
||||
int size = readBuf.size();
|
||||
for (int i = 0; i < size; i ++) {
|
||||
pipeline.fireChannelRead(readBuf.get(i));
|
||||
}
|
||||
readBuf.clear();
|
||||
pipeline.fireChannelReadComplete();
|
||||
|
||||
if (exception != null) {
|
||||
if (exception instanceof IOException) {
|
||||
// ServerChannel should not be closed even on IOException because it can often continue
|
||||
// accepting incoming connections. (e.g. too many open files)
|
||||
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
|
||||
} catch (Throwable t) {
|
||||
exception = t;
|
||||
}
|
||||
|
||||
pipeline.fireExceptionCaught(exception);
|
||||
}
|
||||
int size = readBuf.size();
|
||||
for (int i = 0; i < size; i ++) {
|
||||
pipeline.fireChannelRead(readBuf.get(i));
|
||||
}
|
||||
readBuf.clear();
|
||||
pipeline.fireChannelReadComplete();
|
||||
|
||||
if (closed) {
|
||||
if (isOpen()) {
|
||||
close(voidPromise());
|
||||
if (exception != null) {
|
||||
if (exception instanceof IOException) {
|
||||
// ServerChannel should not be closed even on IOException because it can often continue
|
||||
// accepting incoming connections. (e.g. too many open files)
|
||||
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
|
||||
}
|
||||
|
||||
pipeline.fireExceptionCaught(exception);
|
||||
}
|
||||
|
||||
if (closed) {
|
||||
if (isOpen()) {
|
||||
close(voidPromise());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (!config().isAutoRead()) {
|
||||
removeReadOp();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
package io.netty.channel.oio;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.EventLoop;
|
||||
|
||||
@ -38,11 +39,24 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
|
||||
protected void doRead() {
|
||||
final ChannelPipeline pipeline = pipeline();
|
||||
boolean closed = false;
|
||||
final ChannelConfig config = config();
|
||||
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
|
||||
|
||||
Throwable exception = null;
|
||||
try {
|
||||
int localReadAmount = doReadMessages(readBuf);
|
||||
if (localReadAmount < 0) {
|
||||
closed = true;
|
||||
for (;;) {
|
||||
int localRead = doReadMessages(readBuf);
|
||||
if (localRead == 0) {
|
||||
break;
|
||||
}
|
||||
if (localRead < 0) {
|
||||
closed = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (readBuf.size() >= maxMessagesPerRead || !config.isAutoRead()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
exception = t;
|
||||
|
Loading…
Reference in New Issue
Block a user