EPOLL RDHUP and IN at same time
Motivation: If a RDHUP and IN event occurred at the same time it is possible we may not read all pending data on the channel. We should ensure we read data before processing the RDHUP event. Modifications: - Process the RDHUP event before the IN event. Result: Data will not be dropped. Fixes https://github.com/netty/netty/issues/4317
This commit is contained in:
parent
b2399c2475
commit
32231ee2e0
@ -1526,6 +1526,10 @@ JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_isSupportingTcpFas
|
||||
return JNI_FALSE;
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_errnoENOTCONN(JNIEnv* env, jclass clazz) {
|
||||
return ENOTCONN;
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_errnoEBADF(JNIEnv* env, jclass clazz) {
|
||||
return EBADF;
|
||||
}
|
||||
|
@ -113,6 +113,7 @@ jlong Java_io_netty_channel_epoll_Native_ssizeMax(JNIEnv* env, jclass clazz);
|
||||
jboolean Java_io_netty_channel_epoll_Native_isSupportingSendmmsg(JNIEnv* env, jclass clazz);
|
||||
jboolean Java_io_netty_channel_epoll_Native_isSupportingTcpFastopen(JNIEnv* env, jclass clazz);
|
||||
|
||||
jint Java_io_netty_channel_epoll_Native_errnoENOTCONN(JNIEnv* env, jclass clazz);
|
||||
jint Java_io_netty_channel_epoll_Native_errnoEBADF(JNIEnv* env, jclass clazz);
|
||||
jint Java_io_netty_channel_epoll_Native_errnoEPIPE(JNIEnv* env, jclass clazz);
|
||||
jint Java_io_netty_channel_epoll_Native_errnoECONNRESET(JNIEnv* env, jclass clazz);
|
||||
|
@ -344,6 +344,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
||||
* Called once EPOLLRDHUP event is ready to be processed
|
||||
*/
|
||||
final void epollRdHupReady() {
|
||||
// This must happen before we attempt to read. This will ensure reading continues until an error occurs.
|
||||
recvBufAllocHandle().receivedRdHup();
|
||||
|
||||
if (isActive()) {
|
||||
// If it is still active, we need to call epollInReady as otherwise we may miss to
|
||||
// read pending data from the underlying file descriptor.
|
||||
@ -353,6 +356,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
||||
// Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
|
||||
clearEpollRdHup();
|
||||
}
|
||||
|
||||
// epollInReady may call this, but we should ensure that it gets called.
|
||||
shutdownInput();
|
||||
}
|
||||
|
@ -314,9 +314,6 @@ final class EpollEventLoop extends SingleThreadEventLoop {
|
||||
// past.
|
||||
AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
|
||||
|
||||
// Check if an error was the cause of the wakeup.
|
||||
boolean err = (ev & Native.EPOLLERR) != 0;
|
||||
|
||||
// First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
|
||||
// to read from the file descriptor.
|
||||
// See https://github.com/netty/netty/issues/3785
|
||||
@ -325,25 +322,27 @@ final class EpollEventLoop extends SingleThreadEventLoop {
|
||||
// In either case epollOutReady() will do the correct thing (finish connecting, or fail
|
||||
// the connection).
|
||||
// See https://github.com/netty/netty/issues/3848
|
||||
if (err || ((ev & Native.EPOLLOUT) != 0) && ch.isOpen()) {
|
||||
if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0 && ch.isOpen()) {
|
||||
// Force flush of data as the epoll is writable again
|
||||
unsafe.epollOutReady();
|
||||
}
|
||||
|
||||
// Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
|
||||
// See https://github.com/netty/netty/issues/4317.
|
||||
//
|
||||
// If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
|
||||
// try to read from the underlying file descriptor and so notify the user about the error.
|
||||
if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0 && ch.isOpen()) {
|
||||
// The Channel is still open and there is something to read. Do it now.
|
||||
unsafe.epollInReady();
|
||||
}
|
||||
|
||||
// Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
|
||||
// we may close the channel directly or try to read more data depending on the state of the
|
||||
// Channel and als depending on the AbstractEpollChannel subtype.
|
||||
if ((ev & Native.EPOLLRDHUP) != 0) {
|
||||
unsafe.epollRdHupReady();
|
||||
}
|
||||
|
||||
// If EPOLLOUT or EPOLLING was received and the channel is still open call epollInReady().
|
||||
// This will try to read from the underlying filedescriptor and so notify the user about the
|
||||
// error.
|
||||
if ((err || (ev & Native.EPOLLIN) != 0) && ch.isOpen()) {
|
||||
// The Channel is still open and there is something to read. Do it now.
|
||||
unsafe.epollInReady();
|
||||
}
|
||||
} else {
|
||||
// We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
|
||||
try {
|
||||
|
@ -19,6 +19,7 @@ import io.netty.channel.RecvByteBufAllocator;
|
||||
|
||||
abstract class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle {
|
||||
private final boolean isEdgeTriggered;
|
||||
private boolean receivedRdHup;
|
||||
|
||||
public EpollRecvByteAllocatorHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) {
|
||||
super(handle);
|
||||
@ -28,4 +29,12 @@ abstract class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.Delegat
|
||||
public final boolean isEdgeTriggered() {
|
||||
return isEdgeTriggered;
|
||||
}
|
||||
|
||||
public final void receivedRdHup() {
|
||||
receivedRdHup = true;
|
||||
}
|
||||
|
||||
public final boolean isRdHup() {
|
||||
return receivedRdHup;
|
||||
}
|
||||
}
|
||||
|
@ -31,7 +31,9 @@ final class EpollRecvByteAllocatorMessageHandle extends EpollRecvByteAllocatorHa
|
||||
* If edgeTriggered is used we need to read all bytes/messages as we are not notified again otherwise. For
|
||||
* packet oriented descriptors must read until we get a EAGAIN
|
||||
* (see Q9 in <a href="http://man7.org/linux/man-pages/man7/epoll.7.html">epoll man</a>).
|
||||
*
|
||||
* If EPOLLRDHUP has been received we must read until we get a read error.
|
||||
*/
|
||||
return isEdgeTriggered() ? true : super.continueReading();
|
||||
return isEdgeTriggered() || isRdHup() ? true : super.continueReading();
|
||||
}
|
||||
}
|
||||
|
@ -32,7 +32,10 @@ final class EpollRecvByteAllocatorStreamingHandle extends EpollRecvByteAllocator
|
||||
* if edgeTriggered is used we need to read all bytes/messages as we are not notified again otherwise.
|
||||
* For stream oriented descriptors we can assume we are done reading if the last read attempt didn't produce
|
||||
* a full buffer (see Q9 in <a href="http://man7.org/linux/man-pages/man7/epoll.7.html">epoll man</a>).
|
||||
*
|
||||
* If EPOLLRDHUP has been received we must read until we get a read error.
|
||||
*/
|
||||
return isEdgeTriggered() ? lastBytesRead() == attemptedBytesRead() : super.continueReading();
|
||||
return isRdHup() ? true :
|
||||
isEdgeTriggered() ? lastBytesRead() == attemptedBytesRead() : super.continueReading();
|
||||
}
|
||||
}
|
||||
|
@ -69,6 +69,7 @@ public final class Native {
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff };
|
||||
|
||||
// As all our JNI methods return -errno on error we need to compare with the negative errno codes.
|
||||
private static final int ERRNO_ENOTCONN_NEGATIVE = -errorENOTCONN();
|
||||
private static final int ERRNO_EBADF_NEGATIVE = -errnoEBADF();
|
||||
private static final int ERRNO_EPIPE_NEGATIVE = -errnoEPIPE();
|
||||
private static final int ERRNO_ECONNRESET_NEGATIVE = -errnoECONNRESET();
|
||||
@ -83,7 +84,7 @@ public final class Native {
|
||||
*
|
||||
* The array length of 1024 should be more then enough because errno.h only holds < 200 codes.
|
||||
*/
|
||||
private static final String[] ERRORS = new String[1024]; //
|
||||
private static final String[] ERRORS = new String[1024];
|
||||
|
||||
// Pre-instantiated exceptions which does not need any stacktrace and
|
||||
// can be thrown multiple times for performance reasons.
|
||||
@ -141,7 +142,7 @@ public final class Native {
|
||||
if (err == ERRNO_EPIPE_NEGATIVE || err == ERRNO_ECONNRESET_NEGATIVE) {
|
||||
throw resetCause;
|
||||
}
|
||||
if (err == ERRNO_EBADF_NEGATIVE) {
|
||||
if (err == ERRNO_EBADF_NEGATIVE || err == ERRNO_ENOTCONN_NEGATIVE) {
|
||||
throw CLOSED_CHANNEL_EXCEPTION;
|
||||
}
|
||||
// TODO: We could even go futher and use a pre-instanced IOException for the other error codes, but for
|
||||
@ -189,6 +190,7 @@ public final class Native {
|
||||
private static native int errnoEBADF();
|
||||
private static native int errnoEPIPE();
|
||||
private static native int errnoECONNRESET();
|
||||
private static native int errorENOTCONN();
|
||||
|
||||
private static native int errnoEAGAIN();
|
||||
private static native int errnoEWOULDBLOCK();
|
||||
|
Loading…
Reference in New Issue
Block a user