From ffc862dd315dfcdfcf2054878165ce3103b76f2c Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 2 Feb 2015 11:51:19 +0100 Subject: [PATCH] Faster event processing when epoll transport is used Motivation: Before we used a long[] to store the ready events, this had a few problems and limitations: - An extra loop was needed to translate between epoll_event and our long - JNI may need to do extra memory copy if the JVM not supports pinning - More branches Modifications: - Introduce a EpollEventArray which allows to directly write in a struct epoll_event* and pass it to epoll_wait. Result: Better speed when using native transport, as shown in the benchmark. Before: [xxx@xxx wrk]$ ./wrk -H 'Connection: keep-alive' -d 120 -c 256 -t 16 -s scripts/pipeline-many.lua http://xxx:8080/plaintext Running 2m test @ http://xxx:8080/plaintext 16 threads and 256 connections Thread Stats Avg Stdev Max +/- Stdev Latency 14.56ms 8.64ms 117.15ms 80.58% Req/Sec 286.17k 38.71k 421.48k 68.17% 546324329 requests in 2.00m, 73.78GB read Requests/sec: 4553438.39 Transfer/sec: 629.66MB After: [xxx@xxx wrk]$ ./wrk -H 'Connection: keep-alive' -d 120 -c 256 -t 16 -s scripts/pipeline-many.lua http://xxx:8080/plaintext Running 2m test @ http://xxx:8080/plaintext 16 threads and 256 connections Thread Stats Avg Stdev Max +/- Stdev Latency 14.12ms 8.69ms 100.40ms 83.08% Req/Sec 294.79k 40.23k 472.70k 66.75% 555997226 requests in 2.00m, 75.08GB read Requests/sec: 4634343.40 Transfer/sec: 640.85MB --- .../main/c/io_netty_channel_epoll_Native.c | 94 +++++-------- .../main/c/io_netty_channel_epoll_Native.h | 19 +-- .../channel/epoll/AbstractEpollChannel.java | 1 - .../netty/channel/epoll/EpollEventArray.java | 104 +++++++++++++++ .../netty/channel/epoll/EpollEventLoop.java | 126 +++++++++--------- .../channel/epoll/EpollSocketChannel.java | 4 + .../java/io/netty/channel/epoll/Native.java | 32 ++++- 7 files changed, 237 insertions(+), 143 deletions(-) create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventArray.java diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index 98f585aec5..0cd6521a5b 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -30,6 +30,7 @@ #include #include #include +#include #include "io_netty_channel_epoll_Native.h" // optional @@ -104,20 +105,12 @@ char* exceptionMessage(char* msg, int error) { return result; } -jint epollCtl(JNIEnv* env, jint efd, int op, jint fd, jint flags, jint id) { - uint32_t events = (flags & EPOLL_EDGE) ? EPOLLET : 0; - - if (flags & EPOLL_READ) { - events |= EPOLLIN | EPOLLRDHUP; - } - if (flags & EPOLL_WRITE) { - events |= EPOLLOUT; - } +jint epollCtl(JNIEnv* env, jint efd, int op, jint fd, jint flags) { + uint32_t events = flags; struct epoll_event ev = { - .events = events, - // encode the id into the events - .data.u64 = (((uint64_t) id) << 32L) + .data.fd = fd, + .events = events }; return epoll_ctl(efd, op, fd, &ev); @@ -620,9 +613,8 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollCreate(JNIEnv* en return efd; } -JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollWait(JNIEnv* env, jclass clazz, jint efd, jlongArray events, jint timeout) { - int len = (*env)->GetArrayLength(env, events); - struct epoll_event ev[len]; +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) { + struct epoll_event *ev = (struct epoll_event*) address; int ready; int err; do { @@ -631,60 +623,20 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollWait(JNIEnv* env, } while (ready == -1 && ((err = errno) == EINTR)); if (ready < 0) { - throwIOException(env, exceptionMessage("epoll_wait() failed: ", err)); - return -1; + return -err; } - if (ready == 0) { - // nothing ready for process - return 0; - } - - jboolean isCopy; - // Use GetPrimitiveArrayCritical and ReleasePrimitiveArrayCritical to signal the VM that we really would like - // to not do a memory copy here. This is ok as we not do any blocking action here anyway. - // This is important as the VM may suspend GC for the time! - jlong* elements = (*env)->GetPrimitiveArrayCritical(env, events, &isCopy); - if (elements == NULL) { - // No memory left ?!?!? - throwOutOfMemoryError(env); - return -1; - } - int i; - for (i = 0; i < ready; i++) { - // store the ready ops and id - elements[i] = (jlong) ev[i].data.u64; - if (ev[i].events & EPOLLIN) { - elements[i] |= EPOLL_READ; - } - if (ev[i].events & EPOLLRDHUP) { - elements[i] |= EPOLL_RDHUP; - } - if (ev[i].events & EPOLLOUT) { - elements[i] |= EPOLL_WRITE; - } - } - jint mode; - // release again to prevent memory leak - if (isCopy) { - mode = 0; - } else { - // was just pinned so use JNI_ABORT to eliminate not needed copy. - mode = JNI_ABORT; - } - (*env)->ReleasePrimitiveArrayCritical(env, events, elements, mode); - return ready; } -JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags, jint id) { - if (epollCtl(env, efd, EPOLL_CTL_ADD, fd, flags, id) < 0) { +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags) { + if (epollCtl(env, efd, EPOLL_CTL_ADD, fd, flags) < 0) { int err = errno; throwRuntimeException(env, exceptionMessage("epoll_ctl() failed: ", err)); } } -JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags, jint id) { - if (epollCtl(env, efd, EPOLL_CTL_MOD, fd, flags, id) < 0) { +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags) { + if (epollCtl(env, efd, EPOLL_CTL_MOD, fd, flags) < 0) { int err = errno; throwRuntimeException(env, exceptionMessage("epoll_ctl() failed: ", err)); } @@ -1526,4 +1478,26 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendFd0(JNIEnv* env, j return -1; } +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollet(JNIEnv* env, jclass clazz) { + return EPOLLET; +} +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollin(JNIEnv* env, jclass clazz) { + return EPOLLIN; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollout(JNIEnv* env, jclass clazz) { + return EPOLLOUT; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollrdhup(JNIEnv* env, jclass clazz) { + return EPOLLRDHUP; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sizeofEpollEvent(JNIEnv* env, jclass clazz) { + return sizeof(struct epoll_event); +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_offsetofEpollData(JNIEnv* env, jclass clazz) { + return offsetof(struct epoll_event, data); +} \ No newline at end of file diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index e620e5026b..09f3865e2f 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -16,12 +16,6 @@ #include #include -#define EPOLL_READ 0x01 -#define EPOLL_WRITE 0x02 -#define EPOLL_RDHUP 0x04 -#define EPOLL_EDGE 0x08 - - // Define SO_REUSEPORT if not found to fix build issues. // See https://github.com/netty/netty/issues/2558 #ifndef SO_REUSEPORT @@ -43,9 +37,9 @@ jint Java_io_netty_channel_epoll_Native_eventFd(JNIEnv* env, jclass clazz); void Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value); void Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv* env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_epollCreate(JNIEnv* env, jclass clazz); -jint Java_io_netty_channel_epoll_Native_epollWait(JNIEnv* env, jclass clazz, jint efd, jlongArray events, jint timeout); -void Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags, jint id); -void Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags, jint id); +jint Java_io_netty_channel_epoll_Native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint length, jint timeout); +void Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags); +void Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags); void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv* env, jclass clazz, jint efd, jint fd); jint Java_io_netty_channel_epoll_Native_write0(JNIEnv* env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); jint Java_io_netty_channel_epoll_Native_writeAddress0(JNIEnv* env, jclass clazz, jint fd, jlong address, jint pos, jint limit); @@ -117,3 +111,10 @@ jint Java_io_netty_channel_epoll_Native_errnoEAGAIN(JNIEnv* env, jclass clazz); jint Java_io_netty_channel_epoll_Native_errnoEWOULDBLOCK(JNIEnv* env, jclass clazz); jint Java_io_netty_channel_epoll_Native_errnoEINPROGRESS(JNIEnv* env, jclass clazz); jstring Java_io_netty_channel_epoll_Native_strError(JNIEnv* env, jclass clazz, jint err); + +jint Java_io_netty_channel_epoll_Native_epollin(JNIEnv* env, jclass clazz); +jint Java_io_netty_channel_epoll_Native_epollout(JNIEnv* env, jclass clazz); +jint Java_io_netty_channel_epoll_Native_epollrdhup(JNIEnv* env, jclass clazz); +jint Java_io_netty_channel_epoll_Native_epollet(JNIEnv* env, jclass clazz); +jint Java_io_netty_channel_epoll_Native_sizeofEpollEvent(JNIEnv* env, jclass clazz); +jint Java_io_netty_channel_epoll_Native_offsetofEpollData(JNIEnv* env, jclass clazz); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 56be0d42b0..7598f5b554 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -38,7 +38,6 @@ abstract class AbstractEpollChannel extends AbstractChannel { protected int flags = Native.EPOLLET; protected volatile boolean active; - int id; AbstractEpollChannel(int fd, int flag) { this(null, fd, flag, false); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventArray.java new file mode 100644 index 0000000000..2d84703baf --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventArray.java @@ -0,0 +1,104 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.util.internal.PlatformDependent; + +/** + * This is an internal datastructure which can be directly passed to epoll_wait to reduce the overhead. + * + * typedef union epoll_data { + * void *ptr; + * int fd; + * uint32_t u32; + * uint64_t u64; + * } epoll_data_t; + * + * struct epoll_event { + * uint32_t events; // Epoll events + * epoll_data_t data; // User data variable + * }; + * + * We use {@code fd} if the {@code epoll_data union} to store the actual file descriptor of an + * {@link AbstractEpollChannel} and so be able to map it later. + */ +final class EpollEventArray { + // Size of the epoll_event struct + private static final int EPOLL_EVENT_SIZE = Native.sizeofEpollEvent(); + // The offsiet of the data union in the epoll_event struct + private static final int EPOLL_DATA_OFFSET = Native.offsetofEpollData(); + + private long memoryAddress; + private int length; + + EpollEventArray(int length) { + if (length < 1) { + throw new IllegalArgumentException("length must be >= 1 but was " + length); + } + this.length = length; + memoryAddress = allocate(length); + } + + private static long allocate(int length) { + return PlatformDependent.allocateMemory(length * EPOLL_EVENT_SIZE); + } + + /** + * Return the {@code memoryAddress} which points to the start of this {@link EpollEventArray}. + */ + long memoryAddress() { + return memoryAddress; + } + + /** + * Return the length of the {@link EpollEventArray} which represent the maximum number of {@code epoll_events} + * that can be stored in it. + */ + int length() { + return length; + } + + /** + * Increase the storage of this {@link EpollEventArray}. + */ + void increase() { + // double the size + length <<= 1; + free(); + memoryAddress = allocate(length); + } + + /** + * Free this {@link EpollEventArray}. Any usage after calling this method may segfault the JVM! + */ + void free() { + PlatformDependent.freeMemory(memoryAddress); + } + + /** + * Return the events for the {@code epoll_event} on this index. + */ + int events(int index) { + return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE); + } + + /** + * Return the file descriptor for the {@code epoll_event} on this index. + */ + int fd(int index) { + return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE + EPOLL_DATA_OFFSET); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index e10202acc6..a9c02632a0 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -51,11 +51,9 @@ final class EpollEventLoop extends SingleThreadEventLoop { private final int epollFd; private final int eventFd; - private final IntObjectMap ids = new IntObjectHashMap(); + private final IntObjectMap channels = new IntObjectHashMap(4096); private final boolean allowGrowing; - private long[] events; - - private int id; + private final EpollEventArray events; @SuppressWarnings("unused") private volatile int wakenUp; @@ -65,10 +63,10 @@ final class EpollEventLoop extends SingleThreadEventLoop { super(parent, executor, false); if (maxEvents == 0) { allowGrowing = true; - events = new long[128]; + events = new EpollEventArray(4096); } else { allowGrowing = false; - events = new long[maxEvents]; + events = new EpollEventArray(maxEvents); } boolean success = false; int epollFd = -1; @@ -76,7 +74,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { try { this.epollFd = epollFd = Native.epollCreate(); this.eventFd = eventFd = Native.eventFd(); - Native.epollCtlAdd(epollFd, eventFd, Native.EPOLLIN, 0); + Native.epollCtlAdd(epollFd, eventFd, Native.EPOLLIN); success = true; } finally { if (!success) { @@ -98,29 +96,6 @@ final class EpollEventLoop extends SingleThreadEventLoop { } } - private int nextId() { - int id = this.id; - if (id == Integer.MAX_VALUE) { - // We used all possible ints in the past ( 1 - Integer.MAX_VALUE), time to scrub the stored channels - // and re-assign ids. - AbstractEpollChannel[] channels = ids.values(AbstractEpollChannel.class); - ids.clear(); - - id = 0; - - for (AbstractEpollChannel ch: channels) { - id++; - ch.id = id; - ids.put(ch.id, ch); - } - if (id == Integer.MAX_VALUE) { - throw new IllegalStateException("Could not scrub ids"); - } - } - this.id = ++id; - return id; - } - @Override protected void wakeup(boolean inEventLoop) { if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) { @@ -134,10 +109,9 @@ final class EpollEventLoop extends SingleThreadEventLoop { */ void add(AbstractEpollChannel ch) { assert inEventLoop(); - int id = nextId(); - Native.epollCtlAdd(epollFd, ch.fd().intValue(), ch.flags, id); - ch.id = id; - ids.put(id, ch); + int fd = ch.fd().intValue(); + Native.epollCtlAdd(epollFd, fd, ch.flags); + channels.put(fd, ch); } /** @@ -145,7 +119,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { */ void modify(AbstractEpollChannel ch) { assert inEventLoop(); - Native.epollCtlMod(epollFd, ch.fd().intValue(), ch.flags, ch.id); + Native.epollCtlMod(epollFd, ch.fd().intValue(), ch.flags); } /** @@ -153,10 +127,14 @@ final class EpollEventLoop extends SingleThreadEventLoop { */ void remove(AbstractEpollChannel ch) { assert inEventLoop(); - if (ids.remove(ch.id) != null && ch.isOpen()) { - // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically - // removed once the file-descriptor is closed. - Native.epollCtlDel(epollFd, ch.fd().intValue()); + + if (ch.isOpen()) { + int fd = ch.fd().intValue(); + if (channels.remove(fd) != null) { + // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically + // removed once the file-descriptor is closed. + Native.epollCtlDel(epollFd, ch.fd().intValue()); + } } } @@ -184,7 +162,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { this.ioRatio = ioRatio; } - private int epollWait(boolean oldWakenUp) { + private int epollWait(boolean oldWakenUp) throws IOException { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); @@ -275,9 +253,9 @@ final class EpollEventLoop extends SingleThreadEventLoop { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } - if (allowGrowing && ready == events.length) { - // double the size of the array as we needed the whole space for the events - events = new long[events.length << 1]; + if (allowGrowing && ready == events.length()) { + //increase the size of the array as we needed the whole space for the events + events.increase(); } if (isShuttingDown()) { closeAll(); @@ -302,41 +280,52 @@ final class EpollEventLoop extends SingleThreadEventLoop { } private void closeAll() { - Native.epollWait(epollFd, events, 0); - Collection channels = new ArrayList(ids.size()); + try { + Native.epollWait(epollFd, events, 0); + } catch (IOException ignore) { + // ignore on close + } + Collection array = new ArrayList(channels.size()); - for (IntObjectMap.Entry entry: ids.entries()) { - channels.add(entry.value()); + for (IntObjectMap.Entry entry: channels.entries()) { + array.add(entry.value()); } - for (AbstractEpollChannel ch: channels) { + for (AbstractEpollChannel ch: array) { ch.unsafe().close(ch.unsafe().voidPromise()); } } - private void processReady(long[] events, int ready) { + private void processReady(EpollEventArray events, int ready) { for (int i = 0; i < ready; i ++) { - final long ev = events[i]; - - int id = (int) (ev >> 32L); - if (id == 0) { + final int fd = events.fd(i); + if (fd == eventFd) { // consume wakeup event Native.eventFdRead(eventFd); } else { - AbstractEpollChannel ch = ids.get(id); - if (ch != null) { + final long ev = events.events(i); + + AbstractEpollChannel ch = channels.get(fd); + if (ch != null && ch.isOpen()) { + boolean close = (ev & Native.EPOLLRDHUP) != 0; + boolean read = (ev & Native.EPOLLIN) != 0; + boolean write = (ev & Native.EPOLLOUT) != 0; + AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe(); - if ((ev & Native.EPOLLOUT) != 0 && ch.isOpen()) { + if (write) { // force flush of data as the epoll is writable again unsafe.epollOutReady(); } - if ((ev & Native.EPOLLIN) != 0 && ch.isOpen()) { + if (read) { // Something is ready to read, so consume it now unsafe.epollInReady(); } - if ((ev & Native.EPOLLRDHUP) != 0 && ch.isOpen()) { + if (close) { unsafe.epollRdHupReady(); } + } else { + // We received an event for an fd which we not use anymore. Remove it from the epoll_event set. + Native.epollCtlDel(epollFd, fd); } } } @@ -345,14 +334,19 @@ final class EpollEventLoop extends SingleThreadEventLoop { @Override protected void cleanup() { try { - Native.close(epollFd); - } catch (IOException e) { - logger.warn("Failed to close the epoll fd.", e); - } - try { - Native.close(eventFd); - } catch (IOException e) { - logger.warn("Failed to close the event fd.", e); + try { + Native.close(epollFd); + } catch (IOException e) { + logger.warn("Failed to close the epoll fd.", e); + } + try { + Native.close(eventFd); + } catch (IOException e) { + logger.warn("Failed to close the event fd.", e); + } + } finally { + // release native memory + events.free(); } } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 36d1e37e4e..f1b0407eeb 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -37,6 +37,8 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme EpollSocketChannel(Channel parent, int fd) { super(parent, fd); + // Add EPOLLRDHUP so we are notified once the remote peer close the connection. + flags |= Native.EPOLLRDHUP; config = new EpollSocketChannelConfig(this); // Directly cache the remote and local addresses // See https://github.com/netty/netty/issues/2359 @@ -46,6 +48,8 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme public EpollSocketChannel() { super(Native.socketStreamFd()); + // Add EPOLLRDHUP so we are notified once the remote peer close the connection. + flags |= Native.EPOLLRDHUP; config = new EpollSocketChannelConfig(this); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 2caebc6336..1b0fd2c459 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -49,10 +49,10 @@ final class Native { } // EventLoop operations and constants - public static final int EPOLLIN = 0x01; - public static final int EPOLLOUT = 0x02; - public static final int EPOLLRDHUP = 0x04; - public static final int EPOLLET = 0x08; + public static final int EPOLLIN = epollin(); + public static final int EPOLLOUT = epollout(); + public static final int EPOLLRDHUP = epollrdhup(); + public static final int EPOLLET = epollet(); public static final int IOV_MAX = iovMax(); public static final int UIO_MAX_IOV = uioMaxIov(); @@ -134,9 +134,18 @@ final class Native { public static native void eventFdWrite(int fd, long value); public static native void eventFdRead(int fd); public static native int epollCreate(); - public static native int epollWait(int efd, long[] events, int timeout); - public static native void epollCtlAdd(int efd, final int fd, final int flags, final int id); - public static native void epollCtlMod(int efd, final int fd, final int flags, final int id); + public static int epollWait(int efd, EpollEventArray events, int timeout) throws IOException { + int ready = epollWait0(efd, events.memoryAddress(), events.length(), timeout); + if (ready < 0) { + throw newIOException("epoll_wait", ready); + } + return ready; + } + private static native int epollWait0(int efd, long address, int len, int timeout); + + public static native void epollCtlAdd(int efd, final int fd, final int flags); + + public static native void epollCtlMod(int efd, final int fd, final int flags); public static native void epollCtlDel(int efd, final int fd); private static native int errnoEBADF(); @@ -623,6 +632,15 @@ final class Native { private static native int uioMaxIov(); + // epoll_event related + public static native int sizeofEpollEvent(); + public static native int offsetofEpollData(); + + private static native int epollin(); + private static native int epollout(); + private static native int epollrdhup(); + private static native int epollet(); + private Native() { // utility }