Faster event processing when epoll transport is used

Motivation:

Before we used a long[] to store the ready events, this had a few problems and limitations:
 - An extra loop was needed to translate between epoll_event and our long
 - JNI may need to do extra memory copy if the JVM not supports pinning
 - More branches

Modifications:

- Introduce a EpollEventArray which allows to directly write in a struct epoll_event* and pass it to epoll_wait.

Result:

Better speed when using native transport, as shown in the benchmark.

Before:
[xxx@xxx wrk]$ ./wrk -H 'Connection: keep-alive' -d 120 -c 256 -t 16 -s scripts/pipeline-many.lua  http://xxx:8080/plaintext
Running 2m test @ http://xxx:8080/plaintext
 16 threads and 256 connections
 Thread Stats   Avg      Stdev     Max   +/- Stdev
   Latency    14.56ms    8.64ms 117.15ms   80.58%
   Req/Sec   286.17k    38.71k  421.48k    68.17%
 546324329 requests in 2.00m, 73.78GB read
Requests/sec: 4553438.39
Transfer/sec:    629.66MB

After:
[xxx@xxx wrk]$ ./wrk -H 'Connection: keep-alive' -d 120 -c 256 -t 16 -s scripts/pipeline-many.lua  http://xxx:8080/plaintext
Running 2m test @ http://xxx:8080/plaintext
 16 threads and 256 connections
 Thread Stats   Avg      Stdev     Max   +/- Stdev
   Latency    14.12ms    8.69ms 100.40ms   83.08%
   Req/Sec   294.79k    40.23k  472.70k    66.75%
 555997226 requests in 2.00m, 75.08GB read
Requests/sec: 4634343.40
Transfer/sec:    640.85MB
This commit is contained in:
Norman Maurer 2015-02-02 11:51:19 +01:00
parent 8b2fb2b985
commit 585ce1593f
7 changed files with 237 additions and 143 deletions

View File

@ -30,6 +30,7 @@
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/utsname.h>
#include <stddef.h>
#include "io_netty_channel_epoll_Native.h"
// optional
@ -104,20 +105,12 @@ char* exceptionMessage(char* msg, int error) {
return result;
}
jint epollCtl(JNIEnv* env, jint efd, int op, jint fd, jint flags, jint id) {
uint32_t events = (flags & EPOLL_EDGE) ? EPOLLET : 0;
if (flags & EPOLL_READ) {
events |= EPOLLIN | EPOLLRDHUP;
}
if (flags & EPOLL_WRITE) {
events |= EPOLLOUT;
}
jint epollCtl(JNIEnv* env, jint efd, int op, jint fd, jint flags) {
uint32_t events = flags;
struct epoll_event ev = {
.events = events,
// encode the id into the events
.data.u64 = (((uint64_t) id) << 32L)
.data.fd = fd,
.events = events
};
return epoll_ctl(efd, op, fd, &ev);
@ -620,9 +613,8 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollCreate(JNIEnv* en
return efd;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollWait(JNIEnv* env, jclass clazz, jint efd, jlongArray events, jint timeout) {
int len = (*env)->GetArrayLength(env, events);
struct epoll_event ev[len];
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) {
struct epoll_event *ev = (struct epoll_event*) address;
int ready;
int err;
do {
@ -631,60 +623,20 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollWait(JNIEnv* env,
} while (ready == -1 && ((err = errno) == EINTR));
if (ready < 0) {
throwIOException(env, exceptionMessage("epoll_wait() failed: ", err));
return -1;
return -err;
}
if (ready == 0) {
// nothing ready for process
return 0;
}
jboolean isCopy;
// Use GetPrimitiveArrayCritical and ReleasePrimitiveArrayCritical to signal the VM that we really would like
// to not do a memory copy here. This is ok as we not do any blocking action here anyway.
// This is important as the VM may suspend GC for the time!
jlong* elements = (*env)->GetPrimitiveArrayCritical(env, events, &isCopy);
if (elements == NULL) {
// No memory left ?!?!?
throwOutOfMemoryError(env);
return -1;
}
int i;
for (i = 0; i < ready; i++) {
// store the ready ops and id
elements[i] = (jlong) ev[i].data.u64;
if (ev[i].events & EPOLLIN) {
elements[i] |= EPOLL_READ;
}
if (ev[i].events & EPOLLRDHUP) {
elements[i] |= EPOLL_RDHUP;
}
if (ev[i].events & EPOLLOUT) {
elements[i] |= EPOLL_WRITE;
}
}
jint mode;
// release again to prevent memory leak
if (isCopy) {
mode = 0;
} else {
// was just pinned so use JNI_ABORT to eliminate not needed copy.
mode = JNI_ABORT;
}
(*env)->ReleasePrimitiveArrayCritical(env, events, elements, mode);
return ready;
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags, jint id) {
if (epollCtl(env, efd, EPOLL_CTL_ADD, fd, flags, id) < 0) {
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags) {
if (epollCtl(env, efd, EPOLL_CTL_ADD, fd, flags) < 0) {
int err = errno;
throwRuntimeException(env, exceptionMessage("epoll_ctl() failed: ", err));
}
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags, jint id) {
if (epollCtl(env, efd, EPOLL_CTL_MOD, fd, flags, id) < 0) {
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags) {
if (epollCtl(env, efd, EPOLL_CTL_MOD, fd, flags) < 0) {
int err = errno;
throwRuntimeException(env, exceptionMessage("epoll_ctl() failed: ", err));
}
@ -1526,4 +1478,26 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendFd0(JNIEnv* env, j
return -1;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollet(JNIEnv* env, jclass clazz) {
return EPOLLET;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollin(JNIEnv* env, jclass clazz) {
return EPOLLIN;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollout(JNIEnv* env, jclass clazz) {
return EPOLLOUT;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollrdhup(JNIEnv* env, jclass clazz) {
return EPOLLRDHUP;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sizeofEpollEvent(JNIEnv* env, jclass clazz) {
return sizeof(struct epoll_event);
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_offsetofEpollData(JNIEnv* env, jclass clazz) {
return offsetof(struct epoll_event, data);
}

View File

@ -16,12 +16,6 @@
#include <jni.h>
#include <limits.h>
#define EPOLL_READ 0x01
#define EPOLL_WRITE 0x02
#define EPOLL_RDHUP 0x04
#define EPOLL_EDGE 0x08
// Define SO_REUSEPORT if not found to fix build issues.
// See https://github.com/netty/netty/issues/2558
#ifndef SO_REUSEPORT
@ -43,9 +37,9 @@ jint Java_io_netty_channel_epoll_Native_eventFd(JNIEnv* env, jclass clazz);
void Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value);
void Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv* env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_epollCreate(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_epollWait(JNIEnv* env, jclass clazz, jint efd, jlongArray events, jint timeout);
void Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags, jint id);
void Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags, jint id);
jint Java_io_netty_channel_epoll_Native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint length, jint timeout);
void Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags);
void Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags);
void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv* env, jclass clazz, jint efd, jint fd);
jint Java_io_netty_channel_epoll_Native_write0(JNIEnv* env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
jint Java_io_netty_channel_epoll_Native_writeAddress0(JNIEnv* env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
@ -117,3 +111,10 @@ jint Java_io_netty_channel_epoll_Native_errnoEAGAIN(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_errnoEWOULDBLOCK(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_errnoEINPROGRESS(JNIEnv* env, jclass clazz);
jstring Java_io_netty_channel_epoll_Native_strError(JNIEnv* env, jclass clazz, jint err);
jint Java_io_netty_channel_epoll_Native_epollin(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_epollout(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_epollrdhup(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_epollet(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_sizeofEpollEvent(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_offsetofEpollData(JNIEnv* env, jclass clazz);

View File

@ -38,7 +38,6 @@ abstract class AbstractEpollChannel extends AbstractChannel {
protected int flags = Native.EPOLLET;
protected volatile boolean active;
int id;
AbstractEpollChannel(int fd, int flag) {
this(null, fd, flag, false);

View File

@ -0,0 +1,104 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.util.internal.PlatformDependent;
/**
* This is an internal datastructure which can be directly passed to epoll_wait to reduce the overhead.
*
* typedef union epoll_data {
* void *ptr;
* int fd;
* uint32_t u32;
* uint64_t u64;
* } epoll_data_t;
*
* struct epoll_event {
* uint32_t events; // Epoll events
* epoll_data_t data; // User data variable
* };
*
* We use {@code fd} if the {@code epoll_data union} to store the actual file descriptor of an
* {@link AbstractEpollChannel} and so be able to map it later.
*/
final class EpollEventArray {
// Size of the epoll_event struct
private static final int EPOLL_EVENT_SIZE = Native.sizeofEpollEvent();
// The offsiet of the data union in the epoll_event struct
private static final int EPOLL_DATA_OFFSET = Native.offsetofEpollData();
private long memoryAddress;
private int length;
EpollEventArray(int length) {
if (length < 1) {
throw new IllegalArgumentException("length must be >= 1 but was " + length);
}
this.length = length;
memoryAddress = allocate(length);
}
private static long allocate(int length) {
return PlatformDependent.allocateMemory(length * EPOLL_EVENT_SIZE);
}
/**
* Return the {@code memoryAddress} which points to the start of this {@link EpollEventArray}.
*/
long memoryAddress() {
return memoryAddress;
}
/**
* Return the length of the {@link EpollEventArray} which represent the maximum number of {@code epoll_events}
* that can be stored in it.
*/
int length() {
return length;
}
/**
* Increase the storage of this {@link EpollEventArray}.
*/
void increase() {
// double the size
length <<= 1;
free();
memoryAddress = allocate(length);
}
/**
* Free this {@link EpollEventArray}. Any usage after calling this method may segfault the JVM!
*/
void free() {
PlatformDependent.freeMemory(memoryAddress);
}
/**
* Return the events for the {@code epoll_event} on this index.
*/
int events(int index) {
return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE);
}
/**
* Return the file descriptor for the {@code epoll_event} on this index.
*/
int fd(int index) {
return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE + EPOLL_DATA_OFFSET);
}
}

View File

@ -50,11 +50,9 @@ final class EpollEventLoop extends SingleThreadEventLoop {
private final int epollFd;
private final int eventFd;
private final IntObjectMap<AbstractEpollChannel> ids = new IntObjectHashMap<AbstractEpollChannel>();
private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
private final boolean allowGrowing;
private long[] events;
private int id;
private final EpollEventArray events;
@SuppressWarnings("unused")
private volatile int wakenUp;
@ -64,10 +62,10 @@ final class EpollEventLoop extends SingleThreadEventLoop {
super(parent, threadFactory, false);
if (maxEvents == 0) {
allowGrowing = true;
events = new long[128];
events = new EpollEventArray(4096);
} else {
allowGrowing = false;
events = new long[maxEvents];
events = new EpollEventArray(maxEvents);
}
boolean success = false;
int epollFd = -1;
@ -75,7 +73,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
try {
this.epollFd = epollFd = Native.epollCreate();
this.eventFd = eventFd = Native.eventFd();
Native.epollCtlAdd(epollFd, eventFd, Native.EPOLLIN, 0);
Native.epollCtlAdd(epollFd, eventFd, Native.EPOLLIN);
success = true;
} finally {
if (!success) {
@ -97,29 +95,6 @@ final class EpollEventLoop extends SingleThreadEventLoop {
}
}
private int nextId() {
int id = this.id;
if (id == Integer.MAX_VALUE) {
// We used all possible ints in the past ( 1 - Integer.MAX_VALUE), time to scrub the stored channels
// and re-assign ids.
AbstractEpollChannel[] channels = ids.values(AbstractEpollChannel.class);
ids.clear();
id = 0;
for (AbstractEpollChannel ch: channels) {
id++;
ch.id = id;
ids.put(ch.id, ch);
}
if (id == Integer.MAX_VALUE) {
throw new IllegalStateException("Could not scrub ids");
}
}
this.id = ++id;
return id;
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
@ -133,10 +108,9 @@ final class EpollEventLoop extends SingleThreadEventLoop {
*/
void add(AbstractEpollChannel ch) {
assert inEventLoop();
int id = nextId();
Native.epollCtlAdd(epollFd, ch.fd().intValue(), ch.flags, id);
ch.id = id;
ids.put(id, ch);
int fd = ch.fd().intValue();
Native.epollCtlAdd(epollFd, fd, ch.flags);
channels.put(fd, ch);
}
/**
@ -144,7 +118,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
*/
void modify(AbstractEpollChannel ch) {
assert inEventLoop();
Native.epollCtlMod(epollFd, ch.fd().intValue(), ch.flags, ch.id);
Native.epollCtlMod(epollFd, ch.fd().intValue(), ch.flags);
}
/**
@ -152,10 +126,14 @@ final class EpollEventLoop extends SingleThreadEventLoop {
*/
void remove(AbstractEpollChannel ch) {
assert inEventLoop();
if (ids.remove(ch.id) != null && ch.isOpen()) {
// Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
// removed once the file-descriptor is closed.
Native.epollCtlDel(epollFd, ch.fd().intValue());
if (ch.isOpen()) {
int fd = ch.fd().intValue();
if (channels.remove(fd) != null) {
// Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
// removed once the file-descriptor is closed.
Native.epollCtlDel(epollFd, ch.fd().intValue());
}
}
}
@ -183,7 +161,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
this.ioRatio = ioRatio;
}
private int epollWait(boolean oldWakenUp) {
private int epollWait(boolean oldWakenUp) throws IOException {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
@ -275,9 +253,9 @@ final class EpollEventLoop extends SingleThreadEventLoop {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (allowGrowing && ready == events.length) {
// double the size of the array as we needed the whole space for the events
events = new long[events.length << 1];
if (allowGrowing && ready == events.length()) {
//increase the size of the array as we needed the whole space for the events
events.increase();
}
if (isShuttingDown()) {
closeAll();
@ -300,41 +278,52 @@ final class EpollEventLoop extends SingleThreadEventLoop {
}
private void closeAll() {
Native.epollWait(epollFd, events, 0);
Collection<AbstractEpollChannel> channels = new ArrayList<AbstractEpollChannel>(ids.size());
try {
Native.epollWait(epollFd, events, 0);
} catch (IOException ignore) {
// ignore on close
}
Collection<AbstractEpollChannel> array = new ArrayList<AbstractEpollChannel>(channels.size());
for (IntObjectMap.Entry<AbstractEpollChannel> entry: ids.entries()) {
channels.add(entry.value());
for (IntObjectMap.Entry<AbstractEpollChannel> entry: channels.entries()) {
array.add(entry.value());
}
for (AbstractEpollChannel ch: channels) {
for (AbstractEpollChannel ch: array) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
private void processReady(long[] events, int ready) {
private void processReady(EpollEventArray events, int ready) {
for (int i = 0; i < ready; i ++) {
final long ev = events[i];
int id = (int) (ev >> 32L);
if (id == 0) {
final int fd = events.fd(i);
if (fd == eventFd) {
// consume wakeup event
Native.eventFdRead(eventFd);
} else {
AbstractEpollChannel ch = ids.get(id);
if (ch != null) {
final long ev = events.events(i);
AbstractEpollChannel ch = channels.get(fd);
if (ch != null && ch.isOpen()) {
boolean close = (ev & Native.EPOLLRDHUP) != 0;
boolean read = (ev & Native.EPOLLIN) != 0;
boolean write = (ev & Native.EPOLLOUT) != 0;
AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
if ((ev & Native.EPOLLOUT) != 0 && ch.isOpen()) {
if (write) {
// force flush of data as the epoll is writable again
unsafe.epollOutReady();
}
if ((ev & Native.EPOLLIN) != 0 && ch.isOpen()) {
if (read) {
// Something is ready to read, so consume it now
unsafe.epollInReady();
}
if ((ev & Native.EPOLLRDHUP) != 0 && ch.isOpen()) {
if (close) {
unsafe.epollRdHupReady();
}
} else {
// We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
Native.epollCtlDel(epollFd, fd);
}
}
}
@ -343,14 +332,19 @@ final class EpollEventLoop extends SingleThreadEventLoop {
@Override
protected void cleanup() {
try {
Native.close(epollFd);
} catch (IOException e) {
logger.warn("Failed to close the epoll fd.", e);
}
try {
Native.close(eventFd);
} catch (IOException e) {
logger.warn("Failed to close the event fd.", e);
try {
Native.close(epollFd);
} catch (IOException e) {
logger.warn("Failed to close the epoll fd.", e);
}
try {
Native.close(eventFd);
} catch (IOException e) {
logger.warn("Failed to close the event fd.", e);
}
} finally {
// release native memory
events.free();
}
}
}

View File

@ -37,6 +37,8 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
EpollSocketChannel(Channel parent, int fd) {
super(parent, fd);
// Add EPOLLRDHUP so we are notified once the remote peer close the connection.
flags |= Native.EPOLLRDHUP;
config = new EpollSocketChannelConfig(this);
// Directly cache the remote and local addresses
// See https://github.com/netty/netty/issues/2359
@ -46,6 +48,8 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
public EpollSocketChannel() {
super(Native.socketStreamFd());
// Add EPOLLRDHUP so we are notified once the remote peer close the connection.
flags |= Native.EPOLLRDHUP;
config = new EpollSocketChannelConfig(this);
}

View File

@ -49,10 +49,10 @@ final class Native {
}
// EventLoop operations and constants
public static final int EPOLLIN = 0x01;
public static final int EPOLLOUT = 0x02;
public static final int EPOLLRDHUP = 0x04;
public static final int EPOLLET = 0x08;
public static final int EPOLLIN = epollin();
public static final int EPOLLOUT = epollout();
public static final int EPOLLRDHUP = epollrdhup();
public static final int EPOLLET = epollet();
public static final int IOV_MAX = iovMax();
public static final int UIO_MAX_IOV = uioMaxIov();
@ -134,9 +134,18 @@ final class Native {
public static native void eventFdWrite(int fd, long value);
public static native void eventFdRead(int fd);
public static native int epollCreate();
public static native int epollWait(int efd, long[] events, int timeout);
public static native void epollCtlAdd(int efd, final int fd, final int flags, final int id);
public static native void epollCtlMod(int efd, final int fd, final int flags, final int id);
public static int epollWait(int efd, EpollEventArray events, int timeout) throws IOException {
int ready = epollWait0(efd, events.memoryAddress(), events.length(), timeout);
if (ready < 0) {
throw newIOException("epoll_wait", ready);
}
return ready;
}
private static native int epollWait0(int efd, long address, int len, int timeout);
public static native void epollCtlAdd(int efd, final int fd, final int flags);
public static native void epollCtlMod(int efd, final int fd, final int flags);
public static native void epollCtlDel(int efd, final int fd);
private static native int errnoEBADF();
@ -623,6 +632,15 @@ final class Native {
private static native int uioMaxIov();
// epoll_event related
public static native int sizeofEpollEvent();
public static native int offsetofEpollData();
private static native int epollin();
private static native int epollout();
private static native int epollrdhup();
private static native int epollet();
private Native() {
// utility
}