Correctly handle autoRead == false when epoll LT is used

Motivation:

When epoll LT is used and autoRead == false when entering epollIn() we need to return without reading any data.

Modifications:

Correctly respect autoRead == false if using epoll LT.

Result:

Consistent and correct behaviour.
This commit is contained in:
Norman Maurer 2015-02-20 14:20:33 +01:00
parent 29a5bc9fb6
commit 556a3d5980
4 changed files with 47 additions and 18 deletions

View File

@ -16,6 +16,7 @@
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
@ -75,14 +76,22 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
@Override @Override
void epollInReady() { void epollInReady() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
final ChannelConfig config = config();
if (!readPending && !edgeTriggered && !config.isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0();
return;
}
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
Throwable exception = null; Throwable exception = null;
try { try {
try { try {
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise. // if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config().getMaxMessagesPerRead(); ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
int messages = 0; int messages = 0;
do { do {
int socketFd = Native.accept(fd().intValue()); int socketFd = Native.accept(fd().intValue());
@ -99,7 +108,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t); pipeline.fireExceptionCaught(t);
} finally { } finally {
if (!edgeTriggered && !config().isAutoRead()) { if (!edgeTriggered && !config.isAutoRead()) {
// This is not using EPOLLET so we can stop reading // This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with // ASAP as we will get notified again later with
// pending data // pending data
@ -122,7 +131,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
// //
// See https://github.com/netty/netty/issues/2254 // See https://github.com/netty/netty/issues/2254
if (!config().isAutoRead() && !readPending) { if (!readPending && !config.isAutoRead()) {
clearEpollIn0(); clearEpollIn0();
} }
} }

View File

@ -579,6 +579,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
@Override @Override
void epollInReady() { void epollInReady() {
final ChannelConfig config = config(); final ChannelConfig config = config();
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
if (!readPending && !edgeTriggered && !config.isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0();
return;
}
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator(); final ByteBufAllocator allocator = config.getAllocator();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
@ -589,10 +597,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
ByteBuf byteBuf = null; ByteBuf byteBuf = null;
boolean close = false; boolean close = false;
try { try {
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise. // if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config().getMaxMessagesPerRead(); ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
int messages = 0; int messages = 0;
int totalReadAmount = 0; int totalReadAmount = 0;
do { do {
@ -625,7 +632,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
// which might mean we drained the recv buffer completely. // which might mean we drained the recv buffer completely.
break; break;
} }
if (!edgeTriggered && !config().isAutoRead()) { if (!edgeTriggered && !config.isAutoRead()) {
// This is not using EPOLLET so we can stop reading // This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with // ASAP as we will get notified again later with
// pending data // pending data
@ -659,7 +666,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
// //
// See https://github.com/netty/netty/issues/2254 // See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !readPending) { if (!readPending && !config.isAutoRead()) {
clearEpollIn0(); clearEpollIn0();
} }
} }

View File

@ -18,6 +18,7 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.AddressedEnvelope; import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
@ -507,21 +508,27 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
@Override @Override
void epollInReady() { void epollInReady() {
assert eventLoop().inEventLoop();
DatagramChannelConfig config = config(); DatagramChannelConfig config = config();
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
if (!readPending && !edgeTriggered && !config.isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0();
return;
}
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) { if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
} }
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
Throwable exception = null; Throwable exception = null;
try { try {
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise. // if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config().getMaxMessagesPerRead(); ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
int messages = 0; int messages = 0;
do { do {
ByteBuf data = null; ByteBuf data = null;
@ -558,7 +565,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
if (data != null) { if (data != null) {
data.release(); data.release();
} }
if (!edgeTriggered && !config().isAutoRead()) { if (!edgeTriggered && !config.isAutoRead()) {
// This is not using EPOLLET so we can stop reading // This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with // ASAP as we will get notified again later with
// pending data // pending data
@ -585,7 +592,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
// //
// See https://github.com/netty/netty/issues/2254 // See https://github.com/netty/netty/issues/2254
if (!config().isAutoRead() && !readPending) { if (!readPending && !config.isAutoRead()) {
clearEpollIn(); clearEpollIn();
} }
} }

View File

@ -131,14 +131,20 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
} }
private void epollInReadFd() { private void epollInReadFd() {
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
final ChannelConfig config = config(); final ChannelConfig config = config();
if (!readPending && !edgeTriggered && !config.isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0();
return;
}
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
try { try {
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise. // if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config().getMaxMessagesPerRead(); ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
int messages = 0; int messages = 0;
do { do {
int socketFd = Native.recvFd(fd().intValue()); int socketFd = Native.recvFd(fd().intValue());
@ -158,7 +164,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t); pipeline.fireExceptionCaught(t);
} finally { } finally {
if (!edgeTriggered && !config().isAutoRead()) { if (!edgeTriggered && !config.isAutoRead()) {
// This is not using EPOLLET so we can stop reading // This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with // ASAP as we will get notified again later with
// pending data // pending data
@ -187,7 +193,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
// //
// See https://github.com/netty/netty/issues/2254 // See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !readPending) { if (!readPending && !config.isAutoRead()) {
clearEpollIn0(); clearEpollIn0();
} }
} }