diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index 59ee8f741a..f5a2f2acad 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -1526,6 +1526,10 @@ JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_isSupportingTcpFas return JNI_FALSE; } +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_errnoENOTCONN(JNIEnv* env, jclass clazz) { + return ENOTCONN; +} + JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_errnoEBADF(JNIEnv* env, jclass clazz) { return EBADF; } diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index 16e88cf92f..d4900ec3d3 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -113,6 +113,7 @@ jlong Java_io_netty_channel_epoll_Native_ssizeMax(JNIEnv* env, jclass clazz); jboolean Java_io_netty_channel_epoll_Native_isSupportingSendmmsg(JNIEnv* env, jclass clazz); jboolean Java_io_netty_channel_epoll_Native_isSupportingTcpFastopen(JNIEnv* env, jclass clazz); +jint Java_io_netty_channel_epoll_Native_errnoENOTCONN(JNIEnv* env, jclass clazz); jint Java_io_netty_channel_epoll_Native_errnoEBADF(JNIEnv* env, jclass clazz); jint Java_io_netty_channel_epoll_Native_errnoEPIPE(JNIEnv* env, jclass clazz); jint Java_io_netty_channel_epoll_Native_errnoECONNRESET(JNIEnv* env, jclass clazz); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 8e751d16f5..dbeb92fe7b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -314,16 +314,24 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { protected boolean readPending; + private boolean rdHup; /** * Called once EPOLLIN event is ready to be processed */ abstract void epollInReady(); + public final boolean isRdHup() { + return rdHup; + } + /** * Called once EPOLLRDHUP event is ready to be processed */ final void epollRdHupReady() { + // This must happen before we attempt to read. This will ensure reading continues until an error occurs. + rdHup = true; + if (isActive()) { // If it is still active, we need to call epollInReady as otherwise we may miss to // read pending data from the underlying file descriptor. @@ -333,6 +341,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event. clearEpollRdHup(); } + // epollInReady may call this, but we should ensure that it gets called. shutdownInput(); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index db66538500..392a7ce1a7 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -120,7 +120,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im break; } } - } while (++ messages < maxMessagesPerRead); + } while (++ messages < maxMessagesPerRead || isRdHup()); } catch (Throwable t) { exception = t; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 40ff972024..e04f68a0c8 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -825,7 +825,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { // pending data break; } - } while (++ messages < maxMessagesPerRead); + } while (++ messages < maxMessagesPerRead || isRdHup()); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 2103576454..0a27e7a267 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -578,7 +578,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements break; } } - } while (++ messages < maxMessagesPerRead); + } while (++ messages < maxMessagesPerRead || isRdHup()); int size = readBuf.size(); for (int i = 0; i < size; i ++) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java index 76917d19cb..58215228eb 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java @@ -171,7 +171,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i break; } } - } while (++ messages < maxMessagesPerRead); + } while (++ messages < maxMessagesPerRead || isRdHup()); pipeline.fireChannelReadComplete(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 3564e46a76..ac73f8dd80 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -315,9 +315,6 @@ final class EpollEventLoop extends SingleThreadEventLoop { // past. AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe(); - // Check if an error was the cause of the wakeup. - boolean err = (ev & Native.EPOLLERR) != 0; - // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try // to read from the file descriptor. // See https://github.com/netty/netty/issues/3785 @@ -326,25 +323,27 @@ final class EpollEventLoop extends SingleThreadEventLoop { // In either case epollOutReady() will do the correct thing (finish connecting, or fail // the connection). // See https://github.com/netty/netty/issues/3848 - if (err || ((ev & Native.EPOLLOUT) != 0) && ch.isOpen()) { + if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0 && ch.isOpen()) { // Force flush of data as the epoll is writable again unsafe.epollOutReady(); } + // Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input. + // See https://github.com/netty/netty/issues/4317. + // + // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will + // try to read from the underlying file descriptor and so notify the user about the error. + if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0 && ch.isOpen()) { + // The Channel is still open and there is something to read. Do it now. + unsafe.epollInReady(); + } + // Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case // we may close the channel directly or try to read more data depending on the state of the // Channel and als depending on the AbstractEpollChannel subtype. if ((ev & Native.EPOLLRDHUP) != 0) { unsafe.epollRdHupReady(); } - - // If EPOLLOUT or EPOLLING was received and the channel is still open call epollInReady(). - // This will try to read from the underlying filedescriptor and so notify the user about the - // error. - if ((err || (ev & Native.EPOLLIN) != 0) && ch.isOpen()) { - // The Channel is still open and there is something to read. Do it now. - unsafe.epollInReady(); - } } else { // We received an event for an fd which we not use anymore. Remove it from the epoll_event set. try { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index b1a75e154c..adde3f5400 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -69,6 +69,7 @@ public final class Native { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff }; // As all our JNI methods return -errno on error we need to compare with the negative errno codes. + private static final int ERRNO_ENOTCONN_NEGATIVE = -errnoENOTCONN(); private static final int ERRNO_EBADF_NEGATIVE = -errnoEBADF(); private static final int ERRNO_EPIPE_NEGATIVE = -errnoEPIPE(); private static final int ERRNO_ECONNRESET_NEGATIVE = -errnoECONNRESET(); @@ -83,7 +84,7 @@ public final class Native { * * The array length of 1024 should be more then enough because errno.h only holds < 200 codes. */ - private static final String[] ERRORS = new String[1024]; // + private static final String[] ERRORS = new String[1024]; // Pre-instantiated exceptions which does not need any stacktrace and // can be thrown multiple times for performance reasons. @@ -141,7 +142,7 @@ public final class Native { if (err == ERRNO_EPIPE_NEGATIVE || err == ERRNO_ECONNRESET_NEGATIVE) { throw resetCause; } - if (err == ERRNO_EBADF_NEGATIVE) { + if (err == ERRNO_EBADF_NEGATIVE || err == ERRNO_ENOTCONN_NEGATIVE) { throw CLOSED_CHANNEL_EXCEPTION; } // TODO: We could even go futher and use a pre-instanced IOException for the other error codes, but for @@ -189,6 +190,7 @@ public final class Native { private static native int errnoEBADF(); private static native int errnoEPIPE(); private static native int errnoECONNRESET(); + private static native int errnoENOTCONN(); private static native int errnoEAGAIN(); private static native int errnoEWOULDBLOCK();