diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index de3d5bd384..b809c68b32 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -26,7 +26,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; -import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; @@ -46,7 +45,6 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann protected int flags = Native.EPOLLET; protected volatile boolean active; - private volatile boolean inputShutdown; AbstractEpollChannel(Socket fd, int flag) { this(null, fd, flag, false); @@ -98,15 +96,14 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann @Override protected void doClose() throws Exception { - boolean active = this.active; this.active = false; - FileDescriptor fd = fileDescriptor; + Socket fd = fileDescriptor; try { // deregister from epoll now and shutdown the socket. doDeregister(); - if (active) { + if (!fd.isShutdown()) { try { - fd().shutdown(true, true); + fd().shutdown(); } catch (IOException ignored) { // The FD will be closed, so if shutdown fails there is nothing we can do. } @@ -183,10 +180,6 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann loop.add(this); } - protected final boolean isInputShutdown0() { - return inputShutdown; - } - @Override protected abstract AbstractEpollUnsafe newUnsafe(); @@ -318,7 +311,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann * @param edgeTriggered {@code true} if the channel is using ET mode. {@code false} otherwise. */ final void checkResetEpollIn(boolean edgeTriggered) { - if (edgeTriggered && !isInputShutdown0()) { + if (edgeTriggered && !fd().isInputShutdown()) { // trigger a read again as there may be something left to read and because of epoll ET we // will not get notified again until we read everything from the socket eventLoop().execute(new OneTimeTask() { @@ -367,21 +360,18 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann * Shutdown the input side of the channel. */ void shutdownInput() { - if (!inputShutdown) { // Best effort check on volatile variable to prevent multiple shutdowns - inputShutdown = true; - if (isOpen()) { - if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { - try { - fd().shutdown(true, false); - clearEpollIn0(); - pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); - } catch (IOException e) { - pipeline().fireExceptionCaught(e); - close(voidPromise()); - } - } else { + if (!fd().isInputShutdown()) { + if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + try { + fd().shutdown(true, false); + clearEpollIn0(); + pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } catch (IOException e) { + pipeline().fireExceptionCaught(e); close(voidPromise()); } + } else { + close(voidPromise()); } } } @@ -415,6 +405,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann * Called once a EPOLLOUT event is ready to be processed */ void epollOutReady() { + if (fd().isOutputShutdown()) { + return; + } // directly call super.flush0() to force a flush now super.flush0(); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index db7c25b1f9..ed41f3a526 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -112,6 +112,9 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im @Override void epollInReady() { assert eventLoop().inEventLoop(); + if (fd().isInputShutdown()) { + return; + } boolean edgeTriggered = isFlagSet(Native.EPOLLET); final ChannelConfig config = config(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 4d6bfa9ee6..e1ce9191f8 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -71,8 +71,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { private SocketAddress requestedRemoteAddress; private final Queue spliceQueue = PlatformDependent.newMpscQueue(); - private volatile boolean outputShutdown; - // Lazy init these if we need to splice(...) private FileDescriptor pipeIn; private FileDescriptor pipeOut; @@ -528,14 +526,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); } - protected boolean isOutputShutdown0() { - return outputShutdown || !isActive(); - } - protected void shutdownOutput0(final ChannelPromise promise) { try { fd().shutdown(false, true); - outputShutdown = true; promise.setSuccess(); } catch (Throwable cause) { promise.setFailure(cause); @@ -774,6 +767,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { @Override void epollInReady() { + if (fd().isInputShutdown()) { + return; + } final ChannelConfig config = config(); boolean edgeTriggered = isFlagSet(Native.EPOLLET); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index edda4e5e30..99e7f79cfe 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -526,6 +526,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements @Override void epollInReady() { assert eventLoop().inEventLoop(); + if (fd().isInputShutdown()) { + return; + } DatagramChannelConfig config = config(); boolean edgeTriggered = isFlagSet(Native.EPOLLET); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java index e877d1b0a4..aa7d4bb170 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java @@ -148,6 +148,9 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i } private void epollInReadFd() { + if (fd().isInputShutdown()) { + return; + } boolean edgeTriggered = isFlagSet(Native.EPOLLET); final ChannelConfig config = config(); if (!readPending && !edgeTriggered && !config.isAutoRead()) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 8955e07764..47bd838b96 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -323,7 +323,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { // In either case epollOutReady() will do the correct thing (finish connecting, or fail // the connection). // See https://github.com/netty/netty/issues/3848 - if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0 && ch.isOpen()) { + if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) { // Force flush of data as the epoll is writable again unsafe.epollOutReady(); } @@ -333,7 +333,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { // // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will // try to read from the underlying file descriptor and so notify the user about the error. - if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0 && ch.isOpen()) { + if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) { // The Channel is still open and there is something to read. Do it now. unsafe.epollInReady(); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index ffcf9d7049..72a2b6ae1b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -149,12 +149,12 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme @Override public boolean isInputShutdown() { - return isInputShutdown0(); + return fd().isInputShutdown(); } @Override public boolean isOutputShutdown() { - return isOutputShutdown0(); + return fd().isOutputShutdown(); } @Override diff --git a/transport-native-epoll/src/main/java/io/netty/channel/unix/Socket.java b/transport-native-epoll/src/main/java/io/netty/channel/unix/Socket.java index d92709f886..9ed9e6cf7c 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/unix/Socket.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/unix/Socket.java @@ -58,6 +58,10 @@ public final class Socket extends FileDescriptor { } } + public void shutdown() throws IOException { + shutdown(true, true); + } + public boolean isShutdown() { return isInputShutdown() && isOutputShutdown(); }