diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index f5166ff743..89295370b7 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -22,8 +22,10 @@ import io.netty.buffer.Unpooled; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; @@ -41,6 +43,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann protected int flags = Native.EPOLLET; protected volatile boolean active; + private volatile boolean inputShutdown; AbstractEpollChannel(int fd, int flag) { this(null, fd, flag, false); @@ -175,6 +178,10 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann ((EpollEventLoop) eventLoop().unwrap()).add(this); } + protected final boolean isInputShutdown0() { + return inputShutdown; + } + @Override protected abstract AbstractEpollUnsafe newUnsafe(); @@ -306,8 +313,47 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann /** * Called once EPOLLRDHUP event is ready to be processed */ - void epollRdHupReady() { - // NOOP + final void epollRdHupReady() { + if (isActive()) { + // If it is still active, we need to call epollInReady as otherwise we may miss to + // read pending data from the underlying file descriptor. + // See https://github.com/netty/netty/issues/3709 + epollInReady(); + + // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event. + clearEpollRdHup(); + } + // epollInReady may call this, but we should ensure that it gets called. + shutdownInput(); + } + + /** + * Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure. + */ + private void clearEpollRdHup() { + try { + clearFlag(Native.EPOLLRDHUP); + } catch (IOException e) { + pipeline().fireExceptionCaught(e); + close(voidPromise()); + } + } + + /** + * Shutdown the input side of the channel. + */ + void shutdownInput() { + if (!inputShutdown) { // Best effort check on volatile variable to prevent multiple shutdowns + inputShutdown = true; + if (isOpen()) { + if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + clearEpollIn0(); + pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } else { + close(voidPromise()); + } + } + } } @Override diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 6708d04276..b07fb709db 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -23,7 +23,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; @@ -31,7 +30,6 @@ import io.netty.channel.ConnectTimeoutException; import io.netty.channel.DefaultFileRegion; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.unix.FileDescriptor; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.MpscLinkedQueueNode; @@ -70,7 +68,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { private SocketAddress requestedRemoteAddress; private final Queue spliceQueue = PlatformDependent.newMpscQueue(); - private volatile boolean inputShutdown; private volatile boolean outputShutdown; // Lazy init these if we need to splice(...) @@ -506,10 +503,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); } - protected boolean isInputShutdown0() { - return inputShutdown; - } - protected boolean isOutputShutdown0() { return outputShutdown || !isActive(); } @@ -594,18 +587,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } class EpollStreamUnsafe extends AbstractEpollUnsafe { - private void closeOnRead(ChannelPipeline pipeline) { - inputShutdown = true; - if (isOpen()) { - if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { - clearEpollIn0(); - pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); - } else { - close(voidPromise()); - } - } - } - private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { if (byteBuf != null) { if (byteBuf.isReadable()) { @@ -619,7 +600,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { - closeOnRead(pipeline); + shutdownInput(); return true; } return false; @@ -763,18 +744,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } } - @Override - void epollRdHupReady() { - if (isActive()) { - // If it is still active, we need to call epollInReady as otherwise we may miss to - // read pending data from the underlying file descriptor. - // See https://github.com/netty/netty/issues/3709 - epollInReady(); - } else { - closeOnRead(pipeline()); - } - } - @Override protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) { return new EpollRecvByteAllocatorStreamingHandle(handle, isFlagSet(Native.EPOLLET)); @@ -848,7 +817,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { pipeline.fireChannelReadComplete(); if (close) { - closeOnRead(pipeline); + shutdownInput(); + close = false; } } catch (Throwable t) { boolean closed = handleReadException(pipeline, byteBuf, t, close); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 0c0fa0f3e0..db171f22b7 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -28,7 +28,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Map; import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -56,7 +55,6 @@ final class EpollEventLoop extends SingleThreadEventLoop { private final boolean allowGrowing; private final EpollEventArray events; - @SuppressWarnings("unused") private volatile int wakenUp; private volatile int ioRatio = 50;