From d57dfaa7944c141cdb9f36bb8eae41a7411c39e6 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Wed, 7 Oct 2015 14:12:38 -0700 Subject: [PATCH] EPOLL Shutdown and Half Closed Motivation: The EPOLL module was not completly respecting the half closed state. It may have missed events, or procssed events when it should not have due to checking isOpen instead of the appropriate shutdown state. Modifications: - use FileDescriptor's isShutdown* methods instead of isOpen to check for processing events. Result: Half closed code in EPOLL module is more correct. --- .../channel/epoll/AbstractEpollChannel.java | 41 ++++++++----------- .../epoll/AbstractEpollServerChannel.java | 3 ++ .../epoll/AbstractEpollStreamChannel.java | 10 ++--- .../channel/epoll/EpollDatagramChannel.java | 3 ++ .../epoll/EpollDomainSocketChannel.java | 3 ++ .../netty/channel/epoll/EpollEventLoop.java | 4 +- .../channel/epoll/EpollSocketChannel.java | 4 +- .../java/io/netty/channel/unix/Socket.java | 4 ++ 8 files changed, 37 insertions(+), 35 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 161f0bf1de..fb113576cd 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -26,7 +26,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; -import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; @@ -46,7 +45,6 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann protected int flags = Native.EPOLLET; protected volatile boolean active; - private volatile boolean inputShutdown; AbstractEpollChannel(Socket fd, int flag) { this(null, fd, flag, false); @@ -98,15 +96,14 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann @Override protected void doClose() throws Exception { - boolean active = this.active; this.active = false; - FileDescriptor fd = fileDescriptor; + Socket fd = fileDescriptor; try { // deregister from epoll now and shutdown the socket. doDeregister(); - if (active) { + if (!fd.isShutdown()) { try { - fd().shutdown(true, true); + fd().shutdown(); } catch (IOException ignored) { // The FD will be closed, so if shutdown fails there is nothing we can do. } @@ -182,10 +179,6 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann ((EpollEventLoop) eventLoop().unwrap()).add(this); } - protected final boolean isInputShutdown0() { - return inputShutdown; - } - @Override protected abstract AbstractEpollUnsafe newUnsafe(); @@ -317,7 +310,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann * @param edgeTriggered {@code true} if the channel is using ET mode. {@code false} otherwise. */ final void checkResetEpollIn(boolean edgeTriggered) { - if (edgeTriggered && !isInputShutdown0()) { + if (edgeTriggered && !fd().isInputShutdown()) { // trigger a read again as there may be something left to read and because of epoll ET we // will not get notified again until we read everything from the socket eventLoop().execute(new OneTimeTask() { @@ -366,21 +359,18 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann * Shutdown the input side of the channel. */ void shutdownInput() { - if (!inputShutdown) { // Best effort check on volatile variable to prevent multiple shutdowns - inputShutdown = true; - if (isOpen()) { - if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { - try { - fd().shutdown(true, false); - clearEpollIn0(); - pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); - } catch (IOException e) { - pipeline().fireExceptionCaught(e); - close(voidPromise()); - } - } else { + if (!fd().isInputShutdown()) { + if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + try { + fd().shutdown(true, false); + clearEpollIn0(); + pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } catch (IOException e) { + pipeline().fireExceptionCaught(e); close(voidPromise()); } + } else { + close(voidPromise()); } } } @@ -414,6 +404,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann * Called once a EPOLLOUT event is ready to be processed */ void epollOutReady() { + if (fd().isOutputShutdown()) { + return; + } // directly call super.flush0() to force a flush now super.flush0(); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index db7c25b1f9..ed41f3a526 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -112,6 +112,9 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im @Override void epollInReady() { assert eventLoop().inEventLoop(); + if (fd().isInputShutdown()) { + return; + } boolean edgeTriggered = isFlagSet(Native.EPOLLET); final ChannelConfig config = config(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 087d43614e..f3c28a3f64 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -71,8 +71,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { private SocketAddress requestedRemoteAddress; private final Queue spliceQueue = PlatformDependent.newMpscQueue(); - private volatile boolean outputShutdown; - // Lazy init these if we need to splice(...) private FileDescriptor pipeIn; private FileDescriptor pipeOut; @@ -528,14 +526,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); } - protected boolean isOutputShutdown0() { - return outputShutdown || !isActive(); - } - protected void shutdownOutput0(final ChannelPromise promise) { try { fd().shutdown(false, true); - outputShutdown = true; promise.setSuccess(); } catch (Throwable cause) { promise.setFailure(cause); @@ -774,6 +767,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { @Override void epollInReady() { + if (fd().isInputShutdown()) { + return; + } final ChannelConfig config = config(); boolean edgeTriggered = isFlagSet(Native.EPOLLET); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index edda4e5e30..99e7f79cfe 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -526,6 +526,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements @Override void epollInReady() { assert eventLoop().inEventLoop(); + if (fd().isInputShutdown()) { + return; + } DatagramChannelConfig config = config(); boolean edgeTriggered = isFlagSet(Native.EPOLLET); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java index e877d1b0a4..aa7d4bb170 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java @@ -148,6 +148,9 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i } private void epollInReadFd() { + if (fd().isInputShutdown()) { + return; + } boolean edgeTriggered = isFlagSet(Native.EPOLLET); final ChannelConfig config = config(); if (!readPending && !edgeTriggered && !config.isAutoRead()) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 028e1d1350..67ff1736e1 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -325,7 +325,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { // In either case epollOutReady() will do the correct thing (finish connecting, or fail // the connection). // See https://github.com/netty/netty/issues/3848 - if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0 && ch.isOpen()) { + if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) { // Force flush of data as the epoll is writable again unsafe.epollOutReady(); } @@ -335,7 +335,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { // // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will // try to read from the underlying file descriptor and so notify the user about the error. - if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0 && ch.isOpen()) { + if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) { // The Channel is still open and there is something to read. Do it now. unsafe.epollInReady(); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index ffcf9d7049..72a2b6ae1b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -149,12 +149,12 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme @Override public boolean isInputShutdown() { - return isInputShutdown0(); + return fd().isInputShutdown(); } @Override public boolean isOutputShutdown() { - return isOutputShutdown0(); + return fd().isOutputShutdown(); } @Override diff --git a/transport-native-epoll/src/main/java/io/netty/channel/unix/Socket.java b/transport-native-epoll/src/main/java/io/netty/channel/unix/Socket.java index d92709f886..9ed9e6cf7c 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/unix/Socket.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/unix/Socket.java @@ -58,6 +58,10 @@ public final class Socket extends FileDescriptor { } } + public void shutdown() throws IOException { + shutdown(true, true); + } + public boolean isShutdown() { return isInputShutdown() && isOutputShutdown(); }