EPOLLRDHUP infinite loop

Motivation:
If  is enabled and a channel is half closed it is possible for the EPOLL event loop to get into an infinite loop by continuously being woken up on the EPOLLRDHUP event.

Modifications:
- Ensure that the EPOLLRDHUP event is unregistered for to prevent infinite loop.

Result:
1 less infinite loop.
This commit is contained in:
Scott Mitchell 2015-08-03 17:58:37 -07:00
parent c65ef4fed7
commit d2683c3911
3 changed files with 51 additions and 37 deletions

View File

@ -22,8 +22,10 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel; import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.UnixChannel; import io.netty.channel.unix.UnixChannel;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
@ -41,6 +43,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
protected int flags = Native.EPOLLET; protected int flags = Native.EPOLLET;
protected volatile boolean active; protected volatile boolean active;
private volatile boolean inputShutdown;
AbstractEpollChannel(int fd, int flag) { AbstractEpollChannel(int fd, int flag) {
this(null, fd, flag, false); this(null, fd, flag, false);
@ -176,6 +179,10 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
loop.add(this); loop.add(this);
} }
protected final boolean isInputShutdown0() {
return inputShutdown;
}
@Override @Override
protected abstract AbstractEpollUnsafe newUnsafe(); protected abstract AbstractEpollUnsafe newUnsafe();
@ -307,8 +314,47 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
/** /**
* Called once EPOLLRDHUP event is ready to be processed * Called once EPOLLRDHUP event is ready to be processed
*/ */
void epollRdHupReady() { final void epollRdHupReady() {
// NOOP if (isActive()) {
// If it is still active, we need to call epollInReady as otherwise we may miss to
// read pending data from the underlying file descriptor.
// See https://github.com/netty/netty/issues/3709
epollInReady();
// Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
clearEpollRdHup();
}
// epollInReady may call this, but we should ensure that it gets called.
shutdownInput();
}
/**
* Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure.
*/
private void clearEpollRdHup() {
try {
clearFlag(Native.EPOLLRDHUP);
} catch (IOException e) {
pipeline().fireExceptionCaught(e);
close(voidPromise());
}
}
/**
* Shutdown the input side of the channel.
*/
void shutdownInput() {
if (!inputShutdown) { // Best effort check on volatile variable to prevent multiple shutdowns
inputShutdown = true;
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
clearEpollIn0();
pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
}
}
} }
@Override @Override

View File

@ -23,7 +23,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
@ -31,7 +30,6 @@ import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.DefaultFileRegion; import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.FileDescriptor;
import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.MpscLinkedQueueNode; import io.netty.util.internal.MpscLinkedQueueNode;
@ -70,7 +68,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
private SocketAddress requestedRemoteAddress; private SocketAddress requestedRemoteAddress;
private final Queue<SpliceInTask> spliceQueue = PlatformDependent.newMpscQueue(); private final Queue<SpliceInTask> spliceQueue = PlatformDependent.newMpscQueue();
private volatile boolean inputShutdown;
private volatile boolean outputShutdown; private volatile boolean outputShutdown;
// Lazy init these if we need to splice(...) // Lazy init these if we need to splice(...)
@ -506,10 +503,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
} }
protected boolean isInputShutdown0() {
return inputShutdown;
}
protected boolean isOutputShutdown0() { protected boolean isOutputShutdown0() {
return outputShutdown || !isActive(); return outputShutdown || !isActive();
} }
@ -594,18 +587,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
} }
class EpollStreamUnsafe extends AbstractEpollUnsafe { class EpollStreamUnsafe extends AbstractEpollUnsafe {
private void closeOnRead(ChannelPipeline pipeline) {
inputShutdown = true;
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
clearEpollIn0();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
}
}
private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) { if (byteBuf != null) {
if (byteBuf.isReadable()) { if (byteBuf.isReadable()) {
@ -619,7 +600,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause); pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) { if (close || cause instanceof IOException) {
closeOnRead(pipeline); shutdownInput();
return true; return true;
} }
return false; return false;
@ -763,18 +744,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
} }
} }
@Override
void epollRdHupReady() {
if (isActive()) {
// If it is still active, we need to call epollInReady as otherwise we may miss to
// read pending data from the underlying file descriptor.
// See https://github.com/netty/netty/issues/3709
epollInReady();
} else {
closeOnRead(pipeline());
}
}
@Override @Override
protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) { protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
return new EpollRecvByteAllocatorStreamingHandle(handle, isFlagSet(Native.EPOLLET)); return new EpollRecvByteAllocatorStreamingHandle(handle, isFlagSet(Native.EPOLLET));
@ -848,7 +817,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
if (close) { if (close) {
closeOnRead(pipeline); shutdownInput();
close = false;
} }
} catch (Throwable t) { } catch (Throwable t) {
boolean closed = handleReadException(pipeline, byteBuf, t, close); boolean closed = handleReadException(pipeline, byteBuf, t, close);

View File

@ -28,7 +28,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@ -55,7 +54,6 @@ final class EpollEventLoop extends SingleThreadEventLoop {
private final boolean allowGrowing; private final boolean allowGrowing;
private final EpollEventArray events; private final EpollEventArray events;
@SuppressWarnings("unused")
private volatile int wakenUp; private volatile int wakenUp;
private volatile int ioRatio = 50; private volatile int ioRatio = 50;