EPOLL RDHUP and IN at same time

Motivation:
If a RDHUP and IN event occurred at the same time it is possible we may not read all pending data on the channel. We should ensure we read data before processing the RDHUP event.

Modifications:
- Process the RDHUP event before the IN event.

Result:
Data will not be dropped.
Fixes https://github.com/netty/netty/issues/4317
This commit is contained in:
Scott Mitchell 2015-10-05 17:43:41 -07:00
parent 4e33b4be3b
commit 5d61ef3fed
9 changed files with 33 additions and 18 deletions

View File

@ -1526,6 +1526,10 @@ JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_isSupportingTcpFas
return JNI_FALSE;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_errnoENOTCONN(JNIEnv* env, jclass clazz) {
return ENOTCONN;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_errnoEBADF(JNIEnv* env, jclass clazz) {
return EBADF;
}

View File

@ -113,6 +113,7 @@ jlong Java_io_netty_channel_epoll_Native_ssizeMax(JNIEnv* env, jclass clazz);
jboolean Java_io_netty_channel_epoll_Native_isSupportingSendmmsg(JNIEnv* env, jclass clazz);
jboolean Java_io_netty_channel_epoll_Native_isSupportingTcpFastopen(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_errnoENOTCONN(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_errnoEBADF(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_errnoEPIPE(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_errnoECONNRESET(JNIEnv* env, jclass clazz);

View File

@ -314,16 +314,24 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
protected boolean readPending;
private boolean rdHup;
/**
* Called once EPOLLIN event is ready to be processed
*/
abstract void epollInReady();
public final boolean isRdHup() {
return rdHup;
}
/**
* Called once EPOLLRDHUP event is ready to be processed
*/
final void epollRdHupReady() {
// This must happen before we attempt to read. This will ensure reading continues until an error occurs.
rdHup = true;
if (isActive()) {
// If it is still active, we need to call epollInReady as otherwise we may miss to
// read pending data from the underlying file descriptor.
@ -333,6 +341,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
clearEpollRdHup();
}
// epollInReady may call this, but we should ensure that it gets called.
shutdownInput();
}

View File

@ -120,7 +120,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
break;
}
}
} while (++ messages < maxMessagesPerRead);
} while (++ messages < maxMessagesPerRead || isRdHup());
} catch (Throwable t) {
exception = t;
}

View File

@ -825,7 +825,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
// pending data
break;
}
} while (++ messages < maxMessagesPerRead);
} while (++ messages < maxMessagesPerRead || isRdHup());
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);

View File

@ -578,7 +578,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
break;
}
}
} while (++ messages < maxMessagesPerRead);
} while (++ messages < maxMessagesPerRead || isRdHup());
int size = readBuf.size();
for (int i = 0; i < size; i ++) {

View File

@ -171,7 +171,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
break;
}
}
} while (++ messages < maxMessagesPerRead);
} while (++ messages < maxMessagesPerRead || isRdHup());
pipeline.fireChannelReadComplete();

View File

@ -315,9 +315,6 @@ final class EpollEventLoop extends SingleThreadEventLoop {
// past.
AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
// Check if an error was the cause of the wakeup.
boolean err = (ev & Native.EPOLLERR) != 0;
// First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
// to read from the file descriptor.
// See https://github.com/netty/netty/issues/3785
@ -326,25 +323,27 @@ final class EpollEventLoop extends SingleThreadEventLoop {
// In either case epollOutReady() will do the correct thing (finish connecting, or fail
// the connection).
// See https://github.com/netty/netty/issues/3848
if (err || ((ev & Native.EPOLLOUT) != 0) && ch.isOpen()) {
if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0 && ch.isOpen()) {
// Force flush of data as the epoll is writable again
unsafe.epollOutReady();
}
// Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
// See https://github.com/netty/netty/issues/4317.
//
// If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
// try to read from the underlying file descriptor and so notify the user about the error.
if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0 && ch.isOpen()) {
// The Channel is still open and there is something to read. Do it now.
unsafe.epollInReady();
}
// Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
// we may close the channel directly or try to read more data depending on the state of the
// Channel and als depending on the AbstractEpollChannel subtype.
if ((ev & Native.EPOLLRDHUP) != 0) {
unsafe.epollRdHupReady();
}
// If EPOLLOUT or EPOLLING was received and the channel is still open call epollInReady().
// This will try to read from the underlying filedescriptor and so notify the user about the
// error.
if ((err || (ev & Native.EPOLLIN) != 0) && ch.isOpen()) {
// The Channel is still open and there is something to read. Do it now.
unsafe.epollInReady();
}
} else {
// We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
try {

View File

@ -69,6 +69,7 @@ public final class Native {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff };
// As all our JNI methods return -errno on error we need to compare with the negative errno codes.
private static final int ERRNO_ENOTCONN_NEGATIVE = -errnoENOTCONN();
private static final int ERRNO_EBADF_NEGATIVE = -errnoEBADF();
private static final int ERRNO_EPIPE_NEGATIVE = -errnoEPIPE();
private static final int ERRNO_ECONNRESET_NEGATIVE = -errnoECONNRESET();
@ -83,7 +84,7 @@ public final class Native {
*
* The array length of 1024 should be more then enough because errno.h only holds < 200 codes.
*/
private static final String[] ERRORS = new String[1024]; //
private static final String[] ERRORS = new String[1024];
// Pre-instantiated exceptions which does not need any stacktrace and
// can be thrown multiple times for performance reasons.
@ -141,7 +142,7 @@ public final class Native {
if (err == ERRNO_EPIPE_NEGATIVE || err == ERRNO_ECONNRESET_NEGATIVE) {
throw resetCause;
}
if (err == ERRNO_EBADF_NEGATIVE) {
if (err == ERRNO_EBADF_NEGATIVE || err == ERRNO_ENOTCONN_NEGATIVE) {
throw CLOSED_CHANNEL_EXCEPTION;
}
// TODO: We could even go futher and use a pre-instanced IOException for the other error codes, but for
@ -189,6 +190,7 @@ public final class Native {
private static native int errnoEBADF();
private static native int errnoEPIPE();
private static native int errnoECONNRESET();
private static native int errnoENOTCONN();
private static native int errnoEAGAIN();
private static native int errnoEWOULDBLOCK();