From 9330172f803f340df677c370464126cd6112204a Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 15 Feb 2014 22:26:36 +0100 Subject: [PATCH] Introduce a native transport for linux using epoll ET This transport use JNI (C) to directly make use of epoll in Edge-Triggered mode for maximal performance on Linux. Beside this it also support using TCP_CORK and produce less GC then the NIO transport using JDK NIO. It only builds on linux and skip the build if linux is not used. The transport produce a jar which contains all needed .so files for 32bit and 64 bit. The user only need to include the jar as dependency as usually to make use of it and use the correct classes. This includes also some cleanup of @trustin --- .../util/internal/NativeLibraryLoader.java | 149 +++ .../util/internal/PlatformDependent.java | 143 +++ pom.xml | 17 +- transport-native-epoll/README.md | 3 + transport-native-epoll/pom.xml | 113 +++ .../main/c/io_netty_channel_epoll_Native.c | 859 ++++++++++++++++++ .../main/c/io_netty_channel_epoll_Native.h | 60 ++ .../channel/epoll/AbstractEpollChannel.java | 163 ++++ .../channel/epoll/EpollChannelOption.java | 26 + .../netty/channel/epoll/EpollEventLoop.java | 356 ++++++++ .../channel/epoll/EpollEventLoopGroup.java | 74 ++ .../epoll/EpollServerSocketChannel.java | 124 +++ .../epoll/EpollServerSocketChannelConfig.java | 176 ++++ .../channel/epoll/EpollSocketChannel.java | 590 ++++++++++++ .../epoll/EpollSocketChannelConfig.java | 280 ++++++ .../java/io/netty/channel/epoll/Native.java | 136 +++ .../io/netty/channel/epoll/package-info.java | 21 + .../src/main/native-package/m4/custom.m4 | 47 + .../channel/epoll/EpollSocketEchoTest.java | 31 + .../epoll/EpollSocketFileRegionTest.java | 31 + .../epoll/EpollSocketFixedLengthEchoTest.java | 31 + .../epoll/EpollSocketGatheringWriteTest.java | 31 + .../epoll/EpollSocketObjectEchoTest.java | 31 + .../channel/epoll/EpollSocketSslEchoTest.java | 39 + .../epoll/EpollSocketStartTlsTest.java | 31 + .../epoll/EpollSocketStringEchoTest.java | 31 + .../netty/channel/epoll/EpollTestUtils.java | 46 + 27 files changed, 3638 insertions(+), 1 deletion(-) create mode 100644 common/src/main/java/io/netty/util/internal/NativeLibraryLoader.java create mode 100644 transport-native-epoll/README.md create mode 100644 transport-native-epoll/pom.xml create mode 100644 transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c create mode 100644 transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/package-info.java create mode 100644 transport-native-epoll/src/main/native-package/m4/custom.m4 create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTestUtils.java diff --git a/common/src/main/java/io/netty/util/internal/NativeLibraryLoader.java b/common/src/main/java/io/netty/util/internal/NativeLibraryLoader.java new file mode 100644 index 0000000000..b0b7244989 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/NativeLibraryLoader.java @@ -0,0 +1,149 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.internal; + +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.util.Locale; +import java.util.regex.Pattern; + +/** + * Helper class to load JNI resources. + * + */ +public final class NativeLibraryLoader { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(NativeLibraryLoader.class); + + private static final Pattern REPLACE = Pattern.compile("\\W+"); + private static final File WORKDIR; + + static { + String workdir = SystemPropertyUtil.get("io.netty.native.workdir"); + if (workdir != null) { + File f = new File(workdir); + if (!f.exists()) { + // ok to ignore as createTempFile will take care + //noinspection ResultOfMethodCallIgnored + f.mkdirs(); + } + + try { + f = f.getAbsoluteFile(); + } catch (Exception ignored) { + // Good to have an absolute path, but it's OK. + } + + WORKDIR = f; + logger.debug("-Dio.netty.netty.workdir: {}", WORKDIR); + } else { + WORKDIR = PlatformDependent.tmpdir(); + logger.debug("-Dio.netty.netty.workdir: {} (io.netty.tmpdir)", WORKDIR); + } + } + + /** + * Load the given library with the specified {@link java.lang.ClassLoader} + */ + public static void load(String name, ClassLoader loader) { + String libname = System.mapLibraryName(name); + String path = "META-INF/native/" + osIdentifier() + PlatformDependent.bitMode() + '/' + libname; + + URL url = loader.getResource(path); + if (url == null) { + // Fall back to normal loading of JNI stuff + System.loadLibrary(name); + } else { + int index = libname.lastIndexOf('.'); + String prefix = libname.substring(0, index); + String suffix = libname.substring(index, libname.length()); + InputStream in = null; + OutputStream out = null; + File tmpFile = null; + boolean loaded = false; + try { + tmpFile = File.createTempFile(prefix, suffix, WORKDIR); + in = url.openStream(); + out = new FileOutputStream(tmpFile); + + byte[] buffer = new byte[8192]; + int length; + while ((length = in.read(buffer)) > 0) { + out.write(buffer, 0, length); + } + out.flush(); + out.close(); + out = null; + + System.load(tmpFile.getPath()); + loaded = true; + } catch (Exception e) { + throw (UnsatisfiedLinkError) new UnsatisfiedLinkError( + "could not load a native library: " + name).initCause(e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException ignore) { + // ignore + } + } + if (out != null) { + try { + out.close(); + } catch (IOException ignore) { + // ignore + } + } + if (tmpFile != null) { + if (loaded) { + tmpFile.deleteOnExit(); + } else { + if (!tmpFile.delete()) { + tmpFile.deleteOnExit(); + } + } + } + } + } + } + + private static String osIdentifier() { + String name = SystemPropertyUtil.get("os.name", "unknown").toLowerCase(Locale.US).trim(); + if (name.startsWith("win")) { + return "windows"; + } + if (name.startsWith("mac os x")) { + return "osx"; + } + if (name.startsWith("linux")) { + return "linux"; + } + + return REPLACE.matcher(name).replaceAll("_"); + } + + private NativeLibraryLoader() { + // Utility + } +} diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index a414c578c0..2b2de647a3 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -21,6 +21,7 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.BufferedReader; +import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.lang.reflect.Field; @@ -74,6 +75,10 @@ public final class PlatformDependent { private static final boolean HAS_JAVASSIST = hasJavassist0(); + private static final File TMPDIR = tmpdir0(); + + private static final int BIT_MODE = bitMode0(); + static { if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.noPreferDirect: {}", !DIRECT_BUFFER_PREFERRED); @@ -153,6 +158,20 @@ public final class PlatformDependent { return HAS_JAVASSIST; } + /** + * Returns the temporary directory. + */ + public static File tmpdir() { + return TMPDIR; + } + + /** + * Returns the bit mode of the current VM (usually 32 or 64.) + */ + public static int bitMode() { + return BIT_MODE; + } + /** * Raises an exception bypassing compiler checks for checked exceptions. */ @@ -638,6 +657,130 @@ public final class PlatformDependent { } } + private static File tmpdir0() { + File f; + try { + f = toDirectory(SystemPropertyUtil.get("io.netty.tmpdir")); + if (f != null) { + logger.debug("-Dio.netty.tmpdir: {}", f); + return f; + } + + f = toDirectory(SystemPropertyUtil.get("java.io.tmpdir")); + if (f != null) { + logger.debug("-Dio.netty.tmpdir: {} (java.io.tmpdir)", f); + return f; + } + + // This shouldn't happen, but just in case .. + if (isWindows()) { + f = toDirectory(System.getenv("TEMP")); + if (f != null) { + logger.debug("-Dio.netty.tmpdir: {} (%TEMP%)", f); + return f; + } + + String userprofile = System.getenv("USERPROFILE"); + if (userprofile != null) { + f = toDirectory(userprofile + "\\AppData\\Local\\Temp"); + if (f != null) { + logger.debug("-Dio.netty.tmpdir: {} (%USERPROFILE%\\AppData\\Local\\Temp)", f); + return f; + } + + f = toDirectory(userprofile + "\\Local Settings\\Temp"); + if (f != null) { + logger.debug("-Dio.netty.tmpdir: {} (%USERPROFILE%\\Local Settings\\Temp)", f); + return f; + } + } + } else { + f = toDirectory(System.getenv("TMPDIR")); + if (f != null) { + logger.debug("-Dio.netty.tmpdir: {} ($TMPDIR)", f); + return f; + } + } + } catch (Exception ignored) { + // Environment variable inaccessible + } + + // Last resort. + if (isWindows()) { + f = new File("C:\\Windows\\Temp"); + } else { + f = new File("/tmp"); + } + + logger.warn("Failed to get the temporary directory; falling back to: {}", f); + return f; + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + private static File toDirectory(String path) { + if (path == null) { + return null; + } + + File f = new File(path); + if (!f.exists()) { + f.mkdirs(); + } + + if (!f.isDirectory()) { + return null; + } + + try { + return f.getAbsoluteFile(); + } catch (Exception ignored) { + return f; + } + } + + private static int bitMode0() { + // Check user-specified bit mode first. + int bitMode = SystemPropertyUtil.getInt("io.netty.bitMode", 0); + if (bitMode > 0) { + logger.debug("-Dio.netty.bitMode: {}", bitMode); + return bitMode; + } + + // And then the vendor specific ones which is probably most reliable. + bitMode = SystemPropertyUtil.getInt("sun.arch.data.model", 0); + if (bitMode > 0) { + logger.debug("-Dio.netty.bitMode: {} (sun.arch.data.model)", bitMode); + return bitMode; + } + bitMode = SystemPropertyUtil.getInt("com.ibm.vm.bitmode", 0); + if (bitMode > 0) { + logger.debug("-Dio.netty.bitMode: {} (com.ibm.vm.bitmode)", bitMode); + return bitMode; + } + + // os.arch also gives us a good hint. + String arch = SystemPropertyUtil.get("os.arch", "").toLowerCase(Locale.US).trim(); + if ("amd64".equals(arch) || "x86_64".equals(arch)) { + bitMode = 64; + } else if ("i386".equals(arch) || "i486".equals(arch) || "i586".equals(arch) || "i686".equals(arch)) { + bitMode = 32; + } + + if (bitMode > 0) { + logger.debug("-Dio.netty.bitMode: {} (os.arch: {})", bitMode, arch); + } + + // Last resort: guess from VM name and then fall back to most common 64-bit mode. + String vm = SystemPropertyUtil.get("java.vm.name", "").toLowerCase(Locale.US); + Pattern BIT_PATTERN = Pattern.compile("([1-9][0-9]+)-?bit"); + Matcher m = BIT_PATTERN.matcher(vm); + if (m.find()) { + return Integer.parseInt(m.group(1)); + } else { + return 64; + } + } + private PlatformDependent() { // only static method supported } diff --git a/pom.xml b/pom.xml index 7b51c674fa..624f598997 100644 --- a/pom.xml +++ b/pom.xml @@ -120,6 +120,17 @@ false + + linux-native + + + linux + + + + transport-native-epoll + + @@ -499,6 +510,10 @@ manifest + + jar + bundle + ${project.groupId}.* @@ -565,7 +580,7 @@ 2.4.2 false - -P release,sonatype-oss-release,full,no-osgi + -P release,sonatype-oss-release,full,no-osgi,linux-native true false netty-@{project.version} diff --git a/transport-native-epoll/README.md b/transport-native-epoll/README.md new file mode 100644 index 0000000000..43f97ee933 --- /dev/null +++ b/transport-native-epoll/README.md @@ -0,0 +1,3 @@ +# Native transport for Linux + +See [our wiki page](http://netty.io/wiki/native-transports.html). diff --git a/transport-native-epoll/pom.xml b/transport-native-epoll/pom.xml new file mode 100644 index 0000000000..b9aa83d462 --- /dev/null +++ b/transport-native-epoll/pom.xml @@ -0,0 +1,113 @@ + + + + 4.0.0 + + io.netty + netty-parent + 4.1.0.Alpha1-SNAPSHOT + + netty-transport-native-epoll + + Netty/Transport/Native/Epoll + jar + + + + io.netty + netty-common + ${project.version} + + + io.netty + netty-buffer + ${project.version} + + + io.netty + netty-transport + ${project.version} + + + io.netty + netty-testsuite + ${project.version} + test-jar + test + + + junit + junit + + + + + + + org.fusesource.hawtjni + maven-hawtjni-plugin + 1.10 + + + build-linux64 + + ${project.artifactId} + ${project.build.directory}/linux64 + ${nativeSourceDirectory} + ${libDirectory} + + --with-arch=x86_64 + + linux64 + true + true + + + generate + build + + compile + + + build-linux32 + + ${project.build.directory}/linux32 + ${nativeSourceDirectory} + ${libDirectory} + ${project.artifactId} + + --with-arch=i386 + + linux32 + true + true + + + generate + build + + compile + + + + + + + ${basedir}/src/main/c + ${basedir}/target/classes/ + + diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c new file mode 100644 index 0000000000..99c77e260d --- /dev/null +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -0,0 +1,859 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "io_netty_channel_epoll_Native.h" + + +// optional +extern int accept4(int sockFd, struct sockaddr *addr, socklen_t *addrlen, int flags) __attribute__((weak)); + +// Those are initialized in the init(...) method and cached for performance reasons +jmethodID posId = NULL; +jmethodID limitId = NULL; +jfieldID posFieldId = NULL; +jfieldID limitFieldId = NULL; +jfieldID fileChannelFieldId = NULL; +jfieldID transferedFieldId = NULL; +jfieldID fdFieldId = NULL; +jfieldID fileDescriptorFieldId = NULL; +jmethodID inetSocketAddrMethodId = NULL; +jclass runtimeExceptionClass = NULL; +jclass ioExceptionClass = NULL; +jclass closedChannelExceptionClass = NULL; +jmethodID closedChannelExceptionMethodId = NULL; +jclass inetSocketAddressClass = NULL; +static int socketType; + +// util methods +void throwRuntimeException(JNIEnv *env, char *message) { + (*env)->ThrowNew(env, runtimeExceptionClass, message); +} + +void throwIOException(JNIEnv *env, char *message) { + (*env)->ThrowNew(env, ioExceptionClass, message); +} + +void throwClosedChannelException(JNIEnv *env) { + jobject exception = (*env)->NewObject(env, closedChannelExceptionClass, closedChannelExceptionMethodId); + (*env)->Throw(env, exception); +} + +void throwOutOfMemoryError( JNIEnv *env, char *message) { + jclass exceptionClass = (*env)->FindClass(env, "java/lang/OutOfMemoryError"); + (*env)->ThrowNew(env, exceptionClass, message); +} + +char *exceptionMessage(char *msg, int error) { + char *err = strerror(error); + char *result = malloc(strlen(msg) + strlen(err) + 1); + strcpy(result, msg); + strcat(result, err); + return result; +} + +jint epollCtl(JNIEnv * env, jint efd, int op, jint fd, jint flags, jint id) { + uint32_t events = EPOLLET; + + if (flags & EPOLL_ACCEPT) { + events |= EPOLLIN; + } + if (flags & EPOLL_READ) { + events |= EPOLLIN | EPOLLRDHUP; + } + if (flags & EPOLL_WRITE) { + events |= EPOLLOUT; + } + + struct epoll_event ev = { + .events = events, + // encode the id into the events + .data.u64 = (((uint64_t) id) << 32L) + }; + + return epoll_ctl(efd, op, fd, &ev); +} + +jint getOption(JNIEnv *env, jint fd, int level, int optname, const void *optval, socklen_t optlen) { + int code; + code = getsockopt(fd, level, optname, &optval, &optlen); + if (code == 0) { + return 0; + } + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during getsockopt(...): ", err)); + return code; +} + +int setOption(JNIEnv *env, jint fd, int level, int optname, const void *optval, socklen_t len) { + int rc = setsockopt(fd, level, optname, optval, len); + if (rc < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during setsockopt(...): ", err)); + } + return rc; +} + +jobject createInetSocketAddress(JNIEnv * env, struct sockaddr_storage addr) { + char ipstr[INET6_ADDRSTRLEN]; + int port; + if (addr.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&addr; + port = ntohs(s->sin_port); + inet_ntop(AF_INET, &s->sin_addr, ipstr, sizeof ipstr); + } else { + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr; + port = ntohs(s->sin6_port); + inet_ntop(AF_INET6, &s->sin6_addr, ipstr, sizeof ipstr); + } + jobject socketAddr = (*env)->NewObject(env, inetSocketAddressClass, inetSocketAddrMethodId, ipstr, port); + return socketAddr; +} + +void init_sockaddr(JNIEnv * env, jbyteArray address, jint scopeId, jint jport, struct sockaddr_storage * addr) { + uint16_t port = htons((uint16_t) jport); + jbyte* addressBytes = (*env)->GetByteArrayElements(env, address, 0); + if (socketType == AF_INET6) { + struct sockaddr_in6* ip6addr = (struct sockaddr_in6 *) addr; + ip6addr->sin6_family = AF_INET6; + ip6addr->sin6_port = port; + + if (scopeId != 0) { + ip6addr->sin6_scope_id = (uint32_t) scopeId; + } + memcpy( &(ip6addr->sin6_addr.s6_addr), addressBytes, 16); + } else { + struct sockaddr_in* ipaddr = (struct sockaddr_in *) addr; + ipaddr->sin_family = AF_INET; + ipaddr->sin_port = port; + memcpy( &(ipaddr->sin_addr.s_addr), addressBytes + 12, 4); + } + + (*env)->ReleaseByteArrayElements(env, address, addressBytes, JNI_ABORT); +} + +static int socket_type() { + int fd = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0); + if (fd == -1) { + if (errno == EAFNOSUPPORT) { + return AF_INET; + } + return AF_INET6; + } else { + close(fd); + return AF_INET6; + } +} +// util methods end + +jint JNI_OnLoad(JavaVM* vm, void* reserved) { + JNIEnv* env; + if ((*vm)->GetEnv(vm, (void **) &env, JNI_VERSION_1_6) != JNI_OK) { + return JNI_ERR; + } else { + // cache classes that are used within other jni methods for performance reasons + jclass localClosedChannelExceptionClass = (*env)->FindClass(env, "java/nio/channels/ClosedChannelException"); + if (localClosedChannelExceptionClass == NULL) { + // pending exception... + return JNI_ERR; + } + closedChannelExceptionClass = (jclass) (*env)->NewGlobalRef(env, localClosedChannelExceptionClass); + if (closedChannelExceptionClass == NULL) { + // out-of-memory! + throwOutOfMemoryError(env, "Error allocating memory"); + return JNI_ERR; + } + closedChannelExceptionMethodId = (*env)->GetMethodID(env, closedChannelExceptionClass, "", "()V"); + if (closedChannelExceptionMethodId == NULL) { + throwRuntimeException(env, "Unable to obtain constructor of ClosedChannelException"); + return JNI_ERR; + } + jclass localRuntimeExceptionClass = (*env)->FindClass(env, "java/lang/RuntimeException"); + if (localRuntimeExceptionClass == NULL) { + // pending exception... + return JNI_ERR; + } + runtimeExceptionClass = (jclass) (*env)->NewGlobalRef(env, localRuntimeExceptionClass); + if (runtimeExceptionClass == NULL) { + // out-of-memory! + throwOutOfMemoryError(env, "Error allocating memory"); + return JNI_ERR; + } + + jclass localIoExceptionClass = (*env)->FindClass(env, "java/io/IOException"); + if (localIoExceptionClass == NULL) { + // pending exception... + return JNI_ERR; + } + ioExceptionClass = (jclass) (*env)->NewGlobalRef(env, localIoExceptionClass); + if (ioExceptionClass == NULL) { + // out-of-memory! + throwOutOfMemoryError(env, "Error allocating memory"); + return JNI_ERR; + } + + jclass localInetSocketAddressClass = (*env)->FindClass(env, "java/net/InetSocketAddress"); + if (localIoExceptionClass == NULL) { + // pending exception... + return JNI_ERR; + } + inetSocketAddressClass = (jclass) (*env)->NewGlobalRef(env, localInetSocketAddressClass); + if (inetSocketAddressClass == NULL) { + // out-of-memory! + throwOutOfMemoryError(env, "Error allocating memory"); + return JNI_ERR; + } + + void *mem = malloc(1); + if (mem == NULL) { + throwOutOfMemoryError(env, "Error allocating native buffer"); + return JNI_ERR; + } + jobject directBuffer = (*env)->NewDirectByteBuffer(env, mem, 1); + if (directBuffer == NULL) { + throwOutOfMemoryError(env, "Error allocating native buffer"); + return JNI_ERR; + } + + jclass cls = (*env)->GetObjectClass(env, directBuffer); + + // Get the method id for Buffer.position() and Buffer.limit(). These are used as fallback if + // it is not possible to obtain the position and limit using the fields directly. + posId = (*env)->GetMethodID(env, cls, "position", "()I"); + if (posId == NULL) { + // position method was not found.. something is wrong so bail out + throwRuntimeException(env, "Unable to find method ByteBuffer.position()"); + return JNI_ERR; + } + + limitId = (*env)->GetMethodID(env, cls, "limit", "()I"); + if (limitId == NULL) { + // limit method was not found.. something is wrong so bail out + throwRuntimeException(env, "Unable to find method ByteBuffer.limit()"); + return JNI_ERR; + } + + // Try to get the ids of the position and limit fields. We later then check if we was able + // to find them and if so use them get the position and limit of the buffer. This is + // much faster then call back into java via (*env)->CallIntMethod(...). + posFieldId = (*env)->GetFieldID(env, cls, "position", "I"); + if (posFieldId == NULL) { + // this is ok as we can still use the method so just clear the exception + (*env)->ExceptionClear(env); + } + limitFieldId = (*env)->GetFieldID(env, cls, "limit", "I"); + if (limitFieldId == NULL) { + // this is ok as we can still use the method so just clear the exception + (*env)->ExceptionClear(env); + } + jclass fileRegionCls = (*env)->FindClass(env, "io/netty/channel/DefaultFileRegion"); + if (fileRegionCls == NULL) { + // pending exception... + return JNI_ERR; + } + fileChannelFieldId = (*env)->GetFieldID(env, fileRegionCls, "file", "Ljava/nio/channels/FileChannel;"); + if (fileChannelFieldId == NULL) { + throwRuntimeException(env, "Unable to obtain FileChannel field for DefaultFileRegion"); + return JNI_ERR; + } + transferedFieldId = (*env)->GetFieldID(env, fileRegionCls, "transfered", "J"); + if (transferedFieldId == NULL) { + throwRuntimeException(env, "Unable to obtain transfered field for DefaultFileRegion"); + return JNI_ERR; + } + + jclass fileChannelCls = (*env)->FindClass(env, "sun/nio/ch/FileChannelImpl"); + if (fileChannelCls == NULL) { + // pending exception... + return JNI_ERR; + } + fileDescriptorFieldId = (*env)->GetFieldID(env, fileChannelCls, "fd", "Ljava/io/FileDescriptor;"); + if (fileDescriptorFieldId == NULL) { + throwRuntimeException(env, "Unable to obtain fd field for FileChannelImpl"); + return JNI_ERR; + } + + jclass fileDescriptorCls = (*env)->FindClass(env, "java/io/FileDescriptor"); + if (fileDescriptorCls == NULL) { + // pending exception... + return JNI_ERR; + } + fdFieldId = (*env)->GetFieldID(env, fileDescriptorCls, "fd", "I"); + if (fdFieldId == NULL) { + throwRuntimeException(env, "Unable to obtain fd field for FileDescriptor"); + return JNI_ERR; + } + + inetSocketAddrMethodId = (*env)->GetMethodID(env, inetSocketAddressClass, "", "(Ljava/lang/String;I)V"); + if (inetSocketAddrMethodId == NULL) { + throwRuntimeException(env, "Unable to obtain constructor of InetSocketAddress"); + return JNI_ERR; + } + socketType = socket_type(); + return JNI_VERSION_1_6; + } +} + +void JNI_OnUnload(JavaVM *vm, void *reserved) { + JNIEnv* env; + if ((*vm)->GetEnv(vm, (void **) &env, JNI_VERSION_1_6) != JNI_OK) { + // Something is wrong but nothing we can do about this :( + return; + } else { + // delete global references so the GC can collect them + if (runtimeExceptionClass != NULL) { + (*env)->DeleteGlobalRef(env, runtimeExceptionClass); + } + if (ioExceptionClass != NULL) { + (*env)->DeleteGlobalRef(env, ioExceptionClass); + } + if (closedChannelExceptionClass != NULL) { + (*env)->DeleteGlobalRef(env, closedChannelExceptionClass); + } + if (inetSocketAddressClass != NULL) { + (*env)->DeleteGlobalRef(env, inetSocketAddressClass); + } + } +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz) { + jint eventFD = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + + if (eventFD < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error creating eventFD(...): ", err)); + } + return eventFD; +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value) { + jint eventFD = eventfd_write(fd, (eventfd_t)value); + + if (eventFD < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error calling eventfd_write(...): ", err)); + } +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd) { + uint64_t eventfd_t; + + if (eventfd_read(fd, &eventfd_t) != 0) { + // something is serious wrong + throwRuntimeException(env, "Error calling eventfd_read(...)"); + } +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollCreate(JNIEnv * env, jclass clazz) { + jint efd = epoll_create1(EPOLL_CLOEXEC); + if (efd < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during epoll_create(...): ", err)); + } + return efd; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollWait(JNIEnv * env, jclass clazz, jint efd, jlongArray events, jint timeout) { + int len = (*env)->GetArrayLength(env, events); + struct epoll_event ev[len]; + int ready; + int err; + do { + ready = epoll_wait(efd, ev, len, timeout); + // was interrupted try again. + } while (ready == -1 && (( err = errno) == EINTR)); + + if (ready < 0) { + throwIOException(env, exceptionMessage("Error during epoll_wait(...): ", err)); + return -1; + } + if (ready == 0) { + // nothing ready for process + return 0; + } + + jboolean isCopy; + jlong *elements = (*env)->GetLongArrayElements(env, events, &isCopy); + if (elements == NULL) { + // No memory left ?!?!? + throwOutOfMemoryError(env, "Can't allocate memory"); + return -1; + } + int i; + for (i = 0; i < ready; i++) { + // store the ready ops and id + elements[i] = (jlong) ev[i].data.u64; + if (ev[i].events & EPOLLIN) { + elements[i] |= EPOLL_READ; + } + if (ev[i].events & EPOLLRDHUP) { + elements[i] |= EPOLL_RDHUP; + } + if (ev[i].events & EPOLLOUT) { + elements[i] |= EPOLL_WRITE; + } + } + jint mode; + // release again to prevent memory leak + if (isCopy) { + mode = 0; + } else { + // was just pinned so use JNI_ABORT to eliminate not needed copy. + mode = JNI_ABORT; + } + (*env)->ReleaseLongArrayElements(env, events, elements, mode); + + return ready; +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id) { + if (epollCtl(env, efd, EPOLL_CTL_ADD, fd, flags, id) < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during calling epoll_ctl(...): ", err)); + } +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id) { + if (epollCtl(env, efd, EPOLL_CTL_MOD, fd, flags, id) < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during calling epoll_ctl(...): ", err)); + } +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz, jint efd, jint fd) { + // Create an empty event to workaround a bug in older kernels which can not handle NULL. + struct epoll_event event = { 0 }; + if (epoll_ctl(efd, EPOLL_CTL_DEL, fd, &event) < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during calling epoll_ctl(...): ", err)); + } +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) { + // TODO: We could also maybe pass the address in directly and so eliminate this call + // not sure if this would buy us much. So some testing is needed. + void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer); + if (buffer == NULL) { + throwRuntimeException(env, "Unable to access address of buffer"); + return -1; + } + ssize_t res; + int err; + do { + res = write(fd, buffer + pos, (size_t) (limit - pos)); + // keep on writing if it was interrupted + } while(res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + // network stack saturated... try again later + if (err == EAGAIN || err == EWOULDBLOCK) { + return 0; + } + if (err == EBADF) { + throwClosedChannelException(env); + return -1; + } + throwIOException(env, exceptionMessage("Error while write(...): ", err)); + return -1; + } + return (jint) res; +} + +JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) { + struct iovec iov[length]; + int i; + int iovidx = 0; + for (i = offset; i < length; i++) { + jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); + jint pos; + // Get the current position using the (*env)->GetIntField if possible and fallback + // to slower (*env)->CallIntMethod(...) if needed + if (posFieldId == NULL) { + pos = (*env)->CallIntMethod(env, bufObj, posId, NULL); + } else { + pos = (*env)->GetIntField(env, bufObj, posFieldId); + } + jint limit; + // Get the current limit using the (*env)->GetIntField if possible and fallback + // to slower (*env)->CallIntMethod(...) if needed + if (limitFieldId == NULL) { + limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL); + } else { + limit = (*env)->GetIntField(env, bufObj, limitFieldId); + } + void *buffer = (*env)->GetDirectBufferAddress(env, bufObj); + if (buffer == NULL) { + throwRuntimeException(env, "Unable to access address of buffer"); + return -1; + } + iov[iovidx].iov_base = buffer + pos; + iov[iovidx].iov_len = (size_t) (limit - pos); + iovidx++; + } + + ssize_t res; + int err; + do { + res = writev(fd, iov, length); + // keep on writing if it was interrupted + } while(res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + if (err == EAGAIN || err == EWOULDBLOCK) { + // network stack is saturated we will try again later + return 0; + } + if (err == EBADF) { + throwClosedChannelException(env); + return -1; + } + throwIOException(env, exceptionMessage("Error while write(...): ", err)); + return -1; + } + return res; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) { + // TODO: We could also maybe pass the address in directly and so eliminate this call + // not sure if this would buy us much. So some testing is needed. + void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer); + if (buffer == NULL) { + throwRuntimeException(env, "Unable to access address of buffer"); + return -1; + } + ssize_t res; + int err; + do { + res = read(fd, buffer + pos, (size_t) (limit - pos)); + // Keep on reading if we was interrupted + } while (res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + if (err == EAGAIN || err == EWOULDBLOCK) { + // Nothing left to read + return 0; + } + if (err == EBADF) { + throwClosedChannelException(env); + return -1; + } + throwIOException(env, exceptionMessage("Error while read(...): ", err)); + return -1; + } + + if (res == 0) { + // end-of-stream + return -1; + } + return (jint) res; +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd) { + if (close(fd) < 0) { + throwIOException(env, "Error closing file descriptor"); + } +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_shutdown(JNIEnv * env, jclass clazz, jint fd, jboolean read, jboolean write) { + int mode; + if (read && write) { + mode = SHUT_RDWR; + } else if (read) { + mode = SHUT_RD; + } else if (write) { + mode = SHUT_WR; + } + if (shutdown(fd, mode) < 0) { + throwIOException(env, "Error shutdown socket file descriptor"); + } +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, jclass clazz) { + // TODO: Maybe also respect -Djava.net.preferIPv4Stack=true + int fd = socket(socketType, SOCK_STREAM | SOCK_NONBLOCK, 0); + if (fd == -1) { + int err = errno; + throwIOException(env, exceptionMessage("Error creating socket: ", err)); + return -1; + } else if (socketType == AF_INET6){ + // Allow to listen /connect ipv4 and ipv6 + int optval = 0; + if (setOption(env, fd, IPPROTO_IPV6, IPV6_V6ONLY, &optval, sizeof(optval)) < 0) { + // Something went wrong so close the fd and return here. setOption(...) itself throws the exception already. + close(fd); + return -1; + } + } + return fd; +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_bind(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) { + struct sockaddr_storage addr; + init_sockaddr(env, address, scopeId, port, &addr); + + if(bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == -1){ + int err = errno; + throwIOException(env, exceptionMessage("Error during bind(...): ", err)); + } +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_listen(JNIEnv * env, jclass clazz, jint fd, jint backlog) { + if(listen(fd, backlog) == -1) { + int err = errno; + throwIOException(env, exceptionMessage("Error during listen(...): ", err)); + } +} + +JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_connect(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) { + struct sockaddr_storage addr; + init_sockaddr(env, address, scopeId, port, &addr); + + int res; + int err; + do { + res = connect(fd, (struct sockaddr *) &addr, sizeof(addr)); + } while (res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + if (err == EINPROGRESS) { + // connect not complete yet need to wait for EPOLLOUT event + return JNI_FALSE; + } + throwIOException(env, exceptionMessage("Unable to connect to remote host: ", err)); + + return JNI_FALSE; + } + return JNI_TRUE; +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_finishConnect(JNIEnv * env, jclass clazz, jint fd) { + // connect done, check for error + int optval; + int res = getOption(env, fd, SOL_SOCKET, SO_ERROR, &optval, sizeof(optval)); + if (res == 0) { + return; + } + throwIOException(env, exceptionMessage("Unable to connect to remote host: ", optval)); +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_accept(JNIEnv * env, jclass clazz, jint fd) { + jint socketFd; + int err; + + do { + if (accept4) { + socketFd = accept4(fd, NULL, 0, SOCK_NONBLOCK | SOCK_CLOEXEC); + } else { + socketFd = accept(fd, NULL, 0); + } + } while (socketFd == -1 && ((err = errno) == EINTR)); + + if (socketFd == -1) { + if (err == EAGAIN || err == EWOULDBLOCK) { + // Everything consumed so just return -1 here. + return -1; + } else { + throwIOException(env, exceptionMessage("Error during accept(...): ", err)); + return -1; + } + } + if (accept4) { + return socketFd; + } else { + // accept4 was not present so need two more sys-calls ... + if (fcntl(socketFd, F_SETFD, FD_CLOEXEC) == -1) { + throwIOException(env, exceptionMessage("Error during accept(...): ", err)); + return -1; + } + if (fcntl(socketFd, F_SETFL, O_NONBLOCK) == -1) { + throwIOException(env, exceptionMessage("Error during accept(...): ", err)); + return -1; + } + } + return socketFd; +} + +JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_sendfile(JNIEnv *env, jclass clazz, jint fd, jobject fileRegion, jlong off, jlong len) { + jobject fileChannel = (*env)->GetObjectField(env, fileRegion, fileChannelFieldId); + if (fileChannel == NULL) { + throwRuntimeException(env, "Unable to obtain FileChannel from FileRegion"); + return -1; + } + jobject fileDescriptor = (*env)->GetObjectField(env, fileChannel, fileDescriptorFieldId); + if (fileDescriptor == NULL) { + throwRuntimeException(env, "Unable to obtain FileDescriptor from FileChannel"); + return -1; + } + jint srcFd = (*env)->GetIntField(env, fileDescriptor, fdFieldId); + if (srcFd == -1) { + throwRuntimeException(env, "Unable to obtain the fd from the FileDescriptor"); + return -1; + } + ssize_t res; + off_t offset = off; + int err; + do { + res = sendfile(fd, srcFd, &offset, (size_t) len); + } while (res == -1 && ((err = errno) == EINTR)); + if (res < 0) { + if (err == EAGAIN) { + return 0; + } + throwIOException(env, exceptionMessage("Error during accept(...): ", err)); + return -1; + } + if (res > 0) { + // update the transfered field in DefaultFileRegion + (*env)->SetLongField(env, fileRegion, transferedFieldId, off + res); + } + + return res; +} + +JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_remoteAddress(JNIEnv * env, jclass clazz, jint fd) { + socklen_t len; + struct sockaddr_storage addr; + + len = sizeof addr; + if (getpeername(fd, (struct sockaddr*)&addr, &len) == -1) { + return NULL; + } + return createInetSocketAddress(env, addr); +} + +JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_localAddress(JNIEnv * env, jclass clazz, jint fd) { + socklen_t len; + struct sockaddr_storage addr; + + len = sizeof addr; + if (getsockname(fd, (struct sockaddr*)&addr, &len) == -1) { + return NULL; + } + return createInetSocketAddress(env, addr); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv * env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv *env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv *env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, SOL_TCP, TCP_CORK, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv *env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv *env, jclass clazz, jint fd, jint optval) { + struct linger solinger; + if (optval < 0) { + solinger.l_onoff = 0; + solinger.l_linger = 0; + } else { + solinger.l_onoff = 1; + solinger.l_linger = optval; + } + setOption(env, fd, SOL_SOCKET, SO_LINGER, &solinger, sizeof(solinger)); +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getReceiveBufferSize(JNIEnv * env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv *env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv *env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, SOL_TCP, TCP_CORK, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv *env, jclass clazz, jint fd) { + struct linger optval; + if (getOption(env, fd, SOL_SOCKET, SO_LINGER, &optval, sizeof(optval)) == -1) { + return -1; + } + if (optval.l_onoff == 0) { + return -1; + } else { + return optval.l_linger; + } +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv *env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h new file mode 100644 index 0000000000..3b93bb0c42 --- /dev/null +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -0,0 +1,60 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +#include + + +#define EPOLL_READ 0x01 +#define EPOLL_WRITE 0x02 +#define EPOLL_ACCEPT 0x04 +#define EPOLL_RDHUP 0x08 + +jint Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz); +void Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value); +void Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_epollCreate(JNIEnv * env, jclass clazz); +jint Java_io_netty_channel_epoll_Native_epollWait(JNIEnv * env, jclass clazz, jint efd, jlongArray events, jint timeout); +void Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id); +void Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id); +void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz, jint efd, jint fd); +jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); +jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length); +jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); +void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd); +void Java_io_netty_channel_epoll_Native_shutdown(JNIEnv * env, jclass clazz, jint fd, jboolean read, jboolean write); +jint Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, jclass clazz); +void Java_io_netty_channel_epoll_Native_bind(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port); +void Java_io_netty_channel_epoll_Native_listen(JNIEnv * env, jclass clazz, jint fd, jint backlog); +jboolean Java_io_netty_channel_epoll_Native_connect(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port); +void Java_io_netty_channel_epoll_Native_finishConnect(JNIEnv * env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_accept(JNIEnv * env, jclass clazz, jint fd); +jlong Java_io_netty_channel_epoll_Native_sendfile(JNIEnv *env, jclass clazz, jint fd, jobject fileRegion, jlong off, jlong len); +jobject Java_io_netty_channel_epoll_Native_remoteAddress(JNIEnv * env, jclass clazz, jint fd); +jobject Java_io_netty_channel_epoll_Native_localAddress(JNIEnv * env, jclass clazz, jint fd); +void Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv * env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv *env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv *env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv *env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv *env, jclass clazz, jint fd, jint optval); +jint Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_getReceiveBufferSize(JNIEnv * env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv *env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv *env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv *env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv *env, jclass clazz, jint fd); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java new file mode 100644 index 0000000000..c991ce0f62 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -0,0 +1,163 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.AbstractChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelMetadata; +import io.netty.channel.EventLoop; + +import java.io.IOException; +import java.net.InetSocketAddress; + +abstract class AbstractEpollChannel extends AbstractChannel { + private static final ChannelMetadata DATA = new ChannelMetadata(false); + private final int readFlag; + protected int flags; + protected volatile boolean active; + volatile int fd; + int id; + + AbstractEpollChannel(Channel parent, int fd, int flag) { + super(parent); + this.fd = fd; + readFlag = flag; + flags |= flag; + } + + AbstractEpollChannel(int flag) { + this(null, socketFd(), flag); + } + + private static int socketFd() { + try { + return Native.socket(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public boolean isActive() { + return active; + } + + @Override + public ChannelMetadata metadata() { + return DATA; + } + + @Override + protected void doClose() throws Exception { + active = false; + int fd = this.fd; + this.fd = -1; + Native.close(fd); + } + + @Override + public InetSocketAddress remoteAddress() { + return (InetSocketAddress) super.remoteAddress(); + } + + @Override + public InetSocketAddress localAddress() { + return (InetSocketAddress) super.localAddress(); + } + + @Override + protected void doDisconnect() throws Exception { + doClose(); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return loop instanceof EpollEventLoop; + } + + @Override + public boolean isOpen() { + return fd != -1; + } + + @Override + protected void doDeregister() throws Exception { + ((EpollEventLoop) eventLoop()).remove(this); + } + + @Override + protected void doBeginRead() throws Exception { + if ((flags & readFlag) == 0) { + flags |= readFlag; + ((EpollEventLoop) eventLoop()).modify(this); + } + } + + protected final void clearEpollIn() { + if ((flags & readFlag) != 0) { + flags = ~readFlag; + ((EpollEventLoop) eventLoop()).modify(this); + } + } + + @Override + protected void doRegister() throws Exception { + EpollEventLoop loop = (EpollEventLoop) eventLoop(); + loop.add(this); + } + + @Override + protected abstract AbstractEpollUnsafe newUnsafe(); + + protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { + + /** + * Called once EPOLLIN event is ready to be processed + */ + abstract void epollInReady(); + + /** + * Called once EPOLLRDHUP event is ready to be processed + */ + void epollRdHupReady() { + // NOOP + } + + @Override + protected void flush0() { + // Flush immediately only when there's no pending flush. + // If there's a pending flush operation, event loop will call forceFlush() later, + // and thus there's no need to call it now. + if (isFlushPending()) { + return; + } + super.flush0(); + } + + /** + * Called once a EPOLLOUT event is ready to be processed + */ + void epollOutReady() { + // directly call super.flush0() to force a flush now + super.flush0(); + } + + private boolean isFlushPending() { + return (flags & Native.EPOLLOUT) != 0; + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java new file mode 100644 index 0000000000..58f189641f --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java @@ -0,0 +1,26 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.ChannelOption; + +public final class EpollChannelOption { + private static final Class T = EpollChannelOption.class; + + public static final ChannelOption TCP_CORK = ChannelOption.valueOf(T, "TCP_CORK"); + + private EpollChannelOption() { } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java new file mode 100644 index 0000000000..6adb90aff1 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -0,0 +1,356 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +/** + * {@link EventLoop} which uses epoll under the covers. Only works on Linux! + */ +final class EpollEventLoop extends SingleThreadEventLoop { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class); + private static final AtomicIntegerFieldUpdater WAKEN_UP_UPDATER; + + static { + AtomicIntegerFieldUpdater updater = + PlatformDependent.newAtomicIntegerFieldUpdater(EpollEventLoop.class, "wakenUp"); + if (updater == null) { + updater = AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp"); + } + WAKEN_UP_UPDATER = updater; + } + + private final int epollFd; + private final int eventFd; + private final Map ids = new HashMap(); + private final long[] events; + + private int id; + private int oldWakenUp; + private boolean overflown; + + @SuppressWarnings("unused") + private volatile int wakenUp; + private volatile int ioRatio = 50; + + EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents) { + super(parent, executor, false); + events = new long[maxEvents]; + boolean success = false; + int epollFd = -1; + int eventFd = -1; + try { + this.epollFd = epollFd = Native.epollCreate(); + this.eventFd = eventFd = Native.eventFd(); + Native.epollCtlAdd(epollFd, eventFd, Native.EPOLLIN, 0); + success = true; + } finally { + if (!success) { + if (epollFd != -1) { + try { + Native.close(epollFd); + } catch (Exception e) { + // ignore + } + } + if (eventFd != -1) { + try { + Native.close(eventFd); + } catch (Exception e) { + // ignore + } + } + } + } + } + + private int nextId() { + int id = this.id; + if (id == Integer.MAX_VALUE) { + overflown = true; + id = 0; + } + if (overflown) { + // the ids had an overflow before so we need to make sure the id is not in use atm before assign + // it. + for (;;) { + if (!ids.containsKey(++id)) { + this.id = id; + break; + } + } + } else { + this.id = ++id; + } + return id; + } + + @Override + protected void wakeup(boolean inEventLoop) { + if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) { + // write to the evfd which will then wake-up epoll_wait(...) + Native.eventFdWrite(eventFd, 1L); + } + } + + /** + * Register the given epoll with this {@link io.netty.channel.EventLoop}. + */ + void add(AbstractEpollChannel ch) { + assert inEventLoop(); + int id = nextId(); + Native.epollCtlAdd(epollFd, ch.fd, ch.flags, id); + ch.id = id; + ids.put(id, ch); + } + + /** + * The flags of the given epoll was modified so update the registration + */ + void modify(AbstractEpollChannel ch) { + assert inEventLoop(); + Native.epollCtlMod(epollFd, ch.fd, ch.flags, ch.id); + } + + /** + * Deregister the given epoll from this {@link io.netty.channel.EventLoop}. + */ + void remove(AbstractEpollChannel ch) { + assert inEventLoop(); + if (ids.remove(ch.id) != null && ch.isOpen()) { + // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically + // removed once the file-descriptor is closed. + Native.epollCtlDel(epollFd, ch.fd); + } + } + + @Override + protected Queue newTaskQueue() { + // This event loop never calls takeTask() + return new ConcurrentLinkedQueue(); + } + + /** + * Returns the percentage of the desired amount of time spent for I/O in the event loop. + */ + public int getIoRatio() { + return ioRatio; + } + + /** + * Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is + * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. + */ + public void setIoRatio(int ioRatio) { + if (ioRatio <= 0 || ioRatio > 100) { + throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); + } + this.ioRatio = ioRatio; + } + + private int epollWait() { + int selectCnt = 0; + long currentTimeNanos = System.nanoTime(); + long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); + for (;;) { + long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; + if (timeoutMillis <= 0) { + if (selectCnt == 0) { + int ready = Native.epollWait(epollFd, events, 0); + if (ready > 0) { + return ready; + } + } + break; + } + + int selectedKeys = Native.epollWait(epollFd, events, (int) timeoutMillis); + selectCnt ++; + + if (selectedKeys != 0 || oldWakenUp == 1 || wakenUp == 1 || hasTasks()) { + // Selected something, + // waken up by user, or + // the task queue has a pending task. + return selectedKeys; + } + currentTimeNanos = System.nanoTime(); + } + return 0; + } + + @Override + protected void run() { + for (;;) { + oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0); + try { + int ready; + if (hasTasks()) { + // Non blocking just return what is ready directly without block + ready = Native.epollWait(epollFd, events, 0); + } else { + ready = epollWait(); + + // 'wakenUp.compareAndSet(false, true)' is always evaluated + // before calling 'selector.wakeup()' to reduce the wake-up + // overhead. (Selector.wakeup() is an expensive operation.) + // + // However, there is a race condition in this approach. + // The race condition is triggered when 'wakenUp' is set to + // true too early. + // + // 'wakenUp' is set to true too early if: + // 1) Selector is waken up between 'wakenUp.set(false)' and + // 'selector.select(...)'. (BAD) + // 2) Selector is waken up between 'selector.select(...)' and + // 'if (wakenUp.get()) { ... }'. (OK) + // + // In the first case, 'wakenUp' is set to true and the + // following 'selector.select(...)' will wake up immediately. + // Until 'wakenUp' is set to false again in the next round, + // 'wakenUp.compareAndSet(false, true)' will fail, and therefore + // any attempt to wake up the Selector will fail, too, causing + // the following 'selector.select(...)' call to block + // unnecessarily. + // + // To fix this problem, we wake up the selector again if wakenUp + // is true immediately after selector.select(...). + // It is inefficient in that it wakes up the selector for both + // the first case (BAD - wake-up required) and the second case + // (OK - no wake-up required). + + if (wakenUp == 1) { + Native.eventFdWrite(eventFd, 1L); + } + } + + final int ioRatio = this.ioRatio; + if (ioRatio == 100) { + if (ready > 0) { + processReady(events, ready); + } + runAllTasks(); + } else { + final long ioStartTime = System.nanoTime(); + + if (ready > 0) { + processReady(events, ready); + } + + final long ioTime = System.nanoTime() - ioStartTime; + runAllTasks(ioTime * (100 - ioRatio) / ioRatio); + } + + if (isShuttingDown()) { + closeAll(); + if (confirmShutdown()) { + break; + } + } + } catch (Throwable t) { + logger.warn("Unexpected exception in the selector loop.", t); + + // Prevent possible consecutive immediate failures that lead to + // excessive CPU consumption. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore. + } + } + } + } + + private void closeAll() { + int ready = Native.epollWait(epollFd, events, 0); + Collection channels = new ArrayList(ready); + + for (int i = 0; i < ready; i++) { + final long ev = events[i]; + + int id = (int) (ev >> 32L); + AbstractEpollChannel ch = ids.get(id); + if (ch != null) { + channels.add(ids.get(id)); + } + } + + for (AbstractEpollChannel ch: channels) { + ch.unsafe().close(ch.unsafe().voidPromise()); + } + } + + private void processReady(long[] events, int ready) { + for (int i = 0; i < ready; i ++) { + final long ev = events[i]; + + int id = (int) (ev >> 32L); + if (id == 0) { + // consume wakeup event + Native.eventFdRead(eventFd); + } else { + boolean read = (ev & Native.EPOLLIN) != 0; + boolean write = (ev & Native.EPOLLOUT) != 0; + boolean close = (ev & Native.EPOLLRDHUP) != 0; + + AbstractEpollChannel ch = ids.get(id); + if (ch != null) { + AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe(); + if (write) { + // force flush of data as the epoll is writable again + unsafe.epollOutReady(); + } + if (read) { + // Something is ready to read, so consume it now + unsafe.epollInReady(); + } + if (close) { + unsafe.epollRdHupReady(); + } + } + } + } + } + + @Override + protected void cleanup() { + try { + Native.close(epollFd); + } catch (IOException e) { + logger.warn("Failed to close the epoll fd.", e); + } + try { + Native.close(eventFd); + } catch (IOException e) { + logger.warn("Failed to close the event fd.", e); + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java new file mode 100644 index 0000000000..f4a7867375 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java @@ -0,0 +1,74 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.util.concurrent.EventExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; + +/** + * {@link EventLoopGroup} which uses epoll under the covers. Because of this + * it only works on linux. + */ +public final class EpollEventLoopGroup extends MultithreadEventLoopGroup { + + /** + * Create a new instance using the default number of threads and the default {@link ThreadFactory}. + */ + public EpollEventLoopGroup() { + this(0); + } + + /** + * Create a new instance using the specified number of threads and the default {@link ThreadFactory}. + */ + public EpollEventLoopGroup(int nThreads) { + this(nThreads, null); + } + + /** + * Create a new instance using the specified number of threads and the given {@link ThreadFactory}. + */ + public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory) { + this(nThreads, threadFactory, 128); + } + + /** + * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given + * maximal amount of epoll events to handle per epollWait(...). + */ + public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) { + super(nThreads, threadFactory, maxEventsAtOnce); + } + + /** + * Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is + * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. + */ + public void setIoRatio(int ioRatio) { + for (EventExecutor e: children()) { + ((EpollEventLoop) e).setIoRatio(ioRatio); + } + } + + @Override + protected EventExecutor newChild(Executor executor, Object... args) throws Exception { + return new EpollEventLoop(this, executor, (Integer) args[0]); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java new file mode 100644 index 0000000000..54a5369dfd --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -0,0 +1,124 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.ServerSocketChannelConfig; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +/** + * {@link ServerSocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for + * maximal performance. + */ +public final class EpollServerSocketChannel extends AbstractEpollChannel implements ServerSocketChannel { + + private final EpollServerSocketChannelConfig config; + private volatile InetSocketAddress local; + + public EpollServerSocketChannel() { + super(Native.EPOLLACCEPT); + config = new EpollServerSocketChannelConfig(this); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return loop instanceof EpollEventLoop; + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + InetSocketAddress addr = (InetSocketAddress) localAddress; + Native.bind(fd, addr.getAddress(), addr.getPort()); + local = addr; + Native.listen(fd, config.getBacklog()); + active = true; + } + + @Override + public ServerSocketChannelConfig config() { + return config; + } + + @Override + protected InetSocketAddress localAddress0() { + return local; + } + + @Override + protected InetSocketAddress remoteAddress0() { + return null; + } + + @Override + protected AbstractEpollUnsafe newUnsafe() { + return new EpollServerSocketUnsafe(); + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) { + throw new UnsupportedOperationException(); + } + + final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { + @Override + public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) { + // Connect not supported by ServerChannel implementations + channelPromise.setFailure(new UnsupportedOperationException()); + } + + @Override + void epollInReady() { + assert eventLoop().inEventLoop(); + try { + final ChannelPipeline pipeline = pipeline(); + Throwable exception = null; + try { + for (;;) { + int socketFd = Native.accept(fd); + if (socketFd == -1) { + // this means everything was handled for now + break; + } + try { + pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd)); + } catch (Throwable t) { + // keep on reading as we use epoll ET and need to consume everything from the socket + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(t); + } + } + } catch (Throwable t) { + exception = t; + } + pipeline.fireChannelReadComplete(); + + if (exception != null) { + pipeline.fireExceptionCaught(exception); + } + } finally { + if (!config().isAutoRead()) { + clearEpollIn(); + } + } + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java new file mode 100644 index 0000000000..791a65b46a --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java @@ -0,0 +1,176 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.ServerSocketChannelConfig; +import io.netty.util.NetUtil; + +import java.util.Map; + +import static io.netty.channel.ChannelOption.SO_BACKLOG; +import static io.netty.channel.ChannelOption.SO_RCVBUF; +import static io.netty.channel.ChannelOption.SO_REUSEADDR; + +final class EpollServerSocketChannelConfig extends DefaultChannelConfig + implements ServerSocketChannelConfig { + + private final EpollServerSocketChannel channel; + private volatile int backlog = NetUtil.SOMAXCONN; + + EpollServerSocketChannelConfig(EpollServerSocketChannel channel) { + super(channel); + this.channel = channel; + } + + @Override + public Map, Object> getOptions() { + return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); + } + + @SuppressWarnings("unchecked") + @Override + public T getOption(ChannelOption option) { + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == SO_BACKLOG) { + return (T) Integer.valueOf(getBacklog()); + } + + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == SO_BACKLOG) { + setBacklog((Integer) value); + } else { + return super.setOption(option, value); + } + + return true; + } + + @Override + public boolean isReuseAddress() { + return Native.isReuseAddress(channel.fd) == 1; + } + + @Override + public ServerSocketChannelConfig setReuseAddress(boolean reuseAddress) { + Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0); + return this; + } + + @Override + public int getReceiveBufferSize() { + return Native.getReceiveBufferSize(channel.fd); + } + + @Override + public ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { + Native.setReceiveBufferSize(channel.fd, receiveBufferSize); + + return this; + } + + @Override + public ServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + return this; + } + + @Override + public int getBacklog() { + return backlog; + } + + @Override + public ServerSocketChannelConfig setBacklog(int backlog) { + if (backlog < 0) { + throw new IllegalArgumentException("backlog: " + backlog); + } + this.backlog = backlog; + return this; + } + + @Override + public ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + super.setConnectTimeoutMillis(connectTimeoutMillis); + return this; + } + + @Override + public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + super.setMaxMessagesPerRead(maxMessagesPerRead); + return this; + } + + @Override + public ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) { + super.setWriteSpinCount(writeSpinCount); + return this; + } + + @Override + public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) { + super.setAllocator(allocator); + return this; + } + + @Override + public ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + super.setRecvByteBufAllocator(allocator); + return this; + } + + @Override + public ServerSocketChannelConfig setAutoRead(boolean autoRead) { + super.setAutoRead(autoRead); + return this; + } + + @Override + public ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); + return this; + } + + @Override + public ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); + return this; + } + + @Override + public ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + super.setMessageSizeEstimator(estimator); + return this; + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java new file mode 100644 index 0000000000..a9af87cfdf --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -0,0 +1,590 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.ConnectTimeoutException; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.EventLoop; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.ChannelInputShutdownEvent; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.util.internal.StringUtil; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for + * maximal performance. + */ +public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel { + private final EpollSocketChannelConfig config; + + /** + * The future of the current connection attempt. If not null, subsequent + * connection attempts will fail. + */ + private ChannelPromise connectPromise; + private ScheduledFuture connectTimeoutFuture; + private SocketAddress requestedRemoteAddress; + + private volatile boolean inputShutdown; + private volatile boolean outputShutdown; + + EpollSocketChannel(Channel parent, int fd) { + super(parent, fd, Native.EPOLLIN); + active = true; + config = new EpollSocketChannelConfig(this); + } + + public EpollSocketChannel() { + super(Native.EPOLLIN); + config = new EpollSocketChannelConfig(this); + } + + @Override + protected AbstractEpollUnsafe newUnsafe() { + return new EpollSocketUnsafe(); + } + + @Override + protected SocketAddress localAddress0() { + return Native.localAddress(fd); + } + + @Override + protected SocketAddress remoteAddress0() { + return Native.remoteAddress(fd); + } + + @Override + protected void doBind(SocketAddress local) throws Exception { + InetSocketAddress localAddress = (InetSocketAddress) local; + Native.bind(fd, localAddress.getAddress(), localAddress.getPort()); + } + + private void setEpollOut() { + if ((flags & Native.EPOLLOUT) == 0) { + flags |= Native.EPOLLOUT; + ((EpollEventLoop) eventLoop()).modify(this); + } + } + + private void clearEpollOut() { + if ((flags & Native.EPOLLOUT) != 0) { + flags = ~Native.EPOLLOUT; + ((EpollEventLoop) eventLoop()).modify(this); + } + } + + /** + * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. + * @param buf the {@link ByteBuf} from which the bytes should be written + * @return amount the amount of written bytes + */ + private int doWriteBytes(ByteBuf buf, int readable) throws Exception { + int readerIndex = buf.readerIndex(); + int localFlushedAmount; + if (buf.nioBufferCount() == 1) { + ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, readable); + localFlushedAmount = Native.write(fd, nioBuf, nioBuf.position(), nioBuf.limit()); + } else { + // backed by more then one buffer, do a gathering write... + ByteBuffer[] nioBufs = buf.nioBuffers(); + localFlushedAmount = (int) Native.writev(fd, nioBufs, 0, nioBufs.length); + } + if (localFlushedAmount > 0) { + buf.readerIndex(readerIndex + localFlushedAmount); + } + return localFlushedAmount; + } + + /** + * Write a {@link DefaultFileRegion} + * + * @param region the {@link DefaultFileRegion} from which the bytes should be written + * @return amount the amount of written bytes + */ + private long doWriteFileRegion(DefaultFileRegion region, long count) throws Exception { + return Native.sendfile(fd, region, region.transfered(), count); + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + for (;;) { + final int msgCount = in.size(); + + if (msgCount == 0) { + // Wrote all messages. + clearEpollOut(); + break; + } + // Do non-gathering write for a single buffer case. + if (msgCount > 1) { + // Ensure the pending writes are made of ByteBufs only. + ByteBuffer[] nioBuffers = in.nioBuffers(); + if (nioBuffers != null) { + + int nioBufferCnt = in.nioBufferCount(); + long expectedWrittenBytes = in.nioBufferSize(); + + long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt); + + if (localWrittenBytes < expectedWrittenBytes) { + int nioBufIndex = 0; + setEpollOut(); + + // Did not write all buffers completely. + // Release the fully written buffers and update the indexes of the partially written buffer. + for (int i = msgCount; i > 0; i --) { + final ByteBuf buf = (ByteBuf) in.current(); + final int readerIndex = buf.readerIndex(); + final int readableBytes = buf.writerIndex() - readerIndex; + + if (readableBytes < localWrittenBytes) { + nioBufIndex += buf.nioBufferCount(); + in.remove(); + localWrittenBytes -= readableBytes; + } else if (readableBytes > localWrittenBytes) { + + buf.readerIndex(readerIndex + (int) localWrittenBytes); + in.progress(localWrittenBytes); + + // update position in ByteBuffer as we not do this in the native methods + ByteBuffer bb = nioBuffers[nioBufIndex]; + bb.position(bb.position() + (int) localWrittenBytes); + break; + } else { // readable == writtenBytes + in.remove(); + break; + } + } + } else { + // Release all buffers + for (int i = msgCount; i > 0; i --) { + in.remove(); + } + } + // try again as a ChannelFuture may be notified in the meantime and triggered another flush + continue; + } + } + + Object msg = in.current(); + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + int readableBytes = buf.readableBytes(); + if (readableBytes == 0) { + in.remove(); + continue; + } + + int expected = buf.readableBytes(); + int localFlushedAmount = doWriteBytes(buf, expected); + in.progress(localFlushedAmount); + if (localFlushedAmount < expected) { + setEpollOut(); + break; + } + if (!buf.isReadable()) { + in.remove(); + } + + } else if (msg instanceof DefaultFileRegion) { + DefaultFileRegion region = (DefaultFileRegion) msg; + + long expected = region.count() - region.position(); + long localFlushedAmount = doWriteFileRegion(region, expected); + in.progress(localFlushedAmount); + + if (localFlushedAmount < expected) { + setEpollOut(); + break; + } + + if (region.transfered() >= region.count()) { + in.remove(); + } + } else { + throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); + } + } + } + + @Override + public EpollSocketChannelConfig config() { + return config; + } + + @Override + public boolean isInputShutdown() { + return inputShutdown; + } + + @Override + public boolean isOutputShutdown() { + return outputShutdown || !isActive(); + } + + @Override + public ChannelFuture shutdownOutput() { + return shutdownOutput(newPromise()); + } + + @Override + public ChannelFuture shutdownOutput(final ChannelPromise promise) { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + try { + Native.shutdown(fd, false, true); + outputShutdown = true; + promise.setSuccess(); + } catch (Throwable t) { + promise.setFailure(t); + } + } else { + loop.execute(new Runnable() { + @Override + public void run() { + shutdownOutput(promise); + } + }); + } + return promise; + } + + @Override + public ServerSocketChannel parent() { + return (ServerSocketChannel) super.parent(); + } + + final class EpollSocketUnsafe extends AbstractEpollUnsafe { + private RecvByteBufAllocator.Handle allocHandle; + + @Override + public void write(Object msg, ChannelPromise promise) { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (!buf.isDirect()) { + // We can only handle direct buffers so we need to copy if a non direct is + // passed to write. + int readable = buf.readableBytes(); + ByteBuf dst = alloc().directBuffer(readable); + dst.writeBytes(buf, buf.readerIndex(), readable); + + buf.release(); + msg = dst; + } + } + super.write(msg, promise); + } + + private void closeOnRead(ChannelPipeline pipeline) { + inputShutdown = true; + if (isOpen()) { + if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + clearEpollIn(); + pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } else { + close(voidPromise()); + } + } + } + + private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { + if (byteBuf != null) { + if (byteBuf.isReadable()) { + pipeline.fireChannelRead(byteBuf); + } else { + byteBuf.release(); + } + } + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(cause); + if (close || cause instanceof IOException) { + closeOnRead(pipeline); + return true; + } + return false; + } + + @Override + public void connect( + final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { + if (!promise.setUncancellable() || !ensureOpen(promise)) { + return; + } + + try { + if (connectPromise != null) { + throw new IllegalStateException("connection attempt already made"); + } + + boolean wasActive = isActive(); + if (doConnect((InetSocketAddress) remoteAddress, (InetSocketAddress) localAddress)) { + fulfillConnectPromise(promise, wasActive); + } else { + connectPromise = promise; + requestedRemoteAddress = remoteAddress; + + // Schedule connect timeout. + int connectTimeoutMillis = config().getConnectTimeoutMillis(); + if (connectTimeoutMillis > 0) { + connectTimeoutFuture = eventLoop().schedule(new Runnable() { + @Override + public void run() { + ChannelPromise connectPromise = EpollSocketChannel.this.connectPromise; + ConnectTimeoutException cause = + new ConnectTimeoutException("connection timed out: " + remoteAddress); + if (connectPromise != null && connectPromise.tryFailure(cause)) { + close(voidPromise()); + } + } + }, connectTimeoutMillis, TimeUnit.MILLISECONDS); + } + + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isCancelled()) { + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + close(voidPromise()); + } + } + }); + } + } catch (Throwable t) { + if (t instanceof ConnectException) { + Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress); + newT.setStackTrace(t.getStackTrace()); + t = newT; + } + closeIfClosed(); + promise.tryFailure(t); + } + } + + private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { + if (promise == null) { + // Closed via cancellation and the promise has been notified already. + return; + } + active = true; + + // trySuccess() will return false if a user cancelled the connection attempt. + boolean promiseSet = promise.trySuccess(); + + // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, + // because what happened is what happened. + if (!wasActive && isActive()) { + pipeline().fireChannelActive(); + } + + // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). + if (!promiseSet) { + close(voidPromise()); + } + } + + private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { + if (promise == null) { + // Closed via cancellation and the promise has been notified already. + } + + // Use tryFailure() instead of setFailure() to avoid the race against cancel(). + promise.tryFailure(cause); + closeIfClosed(); + } + + private void finishConnect() { + // Note this method is invoked by the event loop only if the connection attempt was + // neither cancelled nor timed out. + + assert eventLoop().inEventLoop(); + + try { + boolean wasActive = isActive(); + doFinishConnect(); + fulfillConnectPromise(connectPromise, wasActive); + } catch (Throwable t) { + if (t instanceof ConnectException) { + Throwable newT = new ConnectException(t.getMessage() + ": " + requestedRemoteAddress); + newT.setStackTrace(t.getStackTrace()); + t = newT; + } + + fulfillConnectPromise(connectPromise, t); + } finally { + // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used + // See https://github.com/netty/netty/issues/1770 + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + } + } + + @Override + void epollOutReady() { + if (connectPromise != null) { + // pending connect which is now complete so handle it. + finishConnect(); + } else { + super.epollOutReady(); + } + } + + /** + * Connect to the remote peer + */ + private boolean doConnect(InetSocketAddress remoteAddress, InetSocketAddress localAddress) throws Exception { + if (localAddress != null) { + Native.bind(fd, localAddress.getAddress(), localAddress.getPort()); + } + + boolean success = false; + try { + boolean connected = Native.connect(fd, remoteAddress.getAddress(), + remoteAddress.getPort()); + if (!connected) { + setEpollOut(); + } + success = true; + return connected; + } finally { + if (!success) { + doClose(); + } + } + } + + /** + * Finish the connect + */ + private void doFinishConnect() throws Exception { + Native.finishConnect(fd); + clearEpollOut(); + } + + /** + * Read bytes into the given {@link ByteBuf} and return the amount. + */ + private int doReadBytes(ByteBuf byteBuf) throws Exception { + ByteBuffer buf = byteBuf.internalNioBuffer(0, byteBuf.writableBytes()); + int localReadAmount = Native.read(fd, buf, buf.position(), buf.limit()); + if (localReadAmount > 0) { + byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); + } + return localReadAmount; + } + + @Override + void epollRdHupReady() { + closeOnRead(pipeline()); + } + + @Override + void epollInReady() { + final ChannelConfig config = config(); + final ChannelPipeline pipeline = pipeline(); + final ByteBufAllocator allocator = config.getAllocator(); + RecvByteBufAllocator.Handle allocHandle = this.allocHandle; + if (allocHandle == null) { + this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); + } + + ByteBuf byteBuf = null; + boolean close = false; + try { + int byteBufCapacity = allocHandle.guess(); + int totalReadAmount = 0; + for (;;) { + // we use a direct buffer here as the native implementations only be able + // to handle direct buffers. + byteBuf = allocator.directBuffer(byteBufCapacity); + int writable = byteBuf.writableBytes(); + int localReadAmount = doReadBytes(byteBuf); + if (localReadAmount <= 0) { + // not was read release the buffer + byteBuf.release(); + close = localReadAmount < 0; + break; + } + pipeline.fireChannelRead(byteBuf); + byteBuf = null; + + if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { + allocHandle.record(totalReadAmount); + + // Avoid overflow. + totalReadAmount = localReadAmount; + } else { + totalReadAmount += localReadAmount; + } + + if (localReadAmount < writable) { + // Read less than what the buffer can hold, + // which might mean we drained the recv buffer completely. + break; + } + } + + pipeline.fireChannelReadComplete(); + allocHandle.record(totalReadAmount); + + if (close) { + closeOnRead(pipeline); + close = false; + } + } catch (Throwable t) { + boolean closed = handleReadException(pipeline, byteBuf, t, close); + if (!closed) { + // trigger a read again as there may be something left to read and because of epoll ET we + // will not get notified again until we read everything from the socket + eventLoop().execute(new Runnable() { + @Override + public void run() { + epollInReady(); + } + }); + } + } finally { + if (!config.isAutoRead()) { + clearEpollIn(); + } + } + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java new file mode 100644 index 0000000000..cab8ac0e4b --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java @@ -0,0 +1,280 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.SocketChannelConfig; +import io.netty.util.internal.PlatformDependent; + +import java.util.Map; + +import static io.netty.channel.ChannelOption.*; + +public final class EpollSocketChannelConfig extends DefaultChannelConfig + implements SocketChannelConfig { + + protected final EpollSocketChannel channel; + private volatile boolean allowHalfClosure; + + /** + * Creates a new instance. + */ + EpollSocketChannelConfig(EpollSocketChannel channel) { + super(channel); + + this.channel = channel; + if (PlatformDependent.canEnableTcpNoDelayByDefault()) { + setTcpNoDelay(true); + } + } + + @Override + public Map, Object> getOptions() { + return getOptions( + super.getOptions(), + SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS, + ALLOW_HALF_CLOSURE, EpollChannelOption.TCP_CORK); + } + + @SuppressWarnings("unchecked") + @Override + public T getOption(ChannelOption option) { + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == SO_SNDBUF) { + return (T) Integer.valueOf(getSendBufferSize()); + } + if (option == TCP_NODELAY) { + return (T) Boolean.valueOf(isTcpNoDelay()); + } + if (option == SO_KEEPALIVE) { + return (T) Boolean.valueOf(isKeepAlive()); + } + if (option == SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == SO_LINGER) { + return (T) Integer.valueOf(getSoLinger()); + } + if (option == IP_TOS) { + return (T) Integer.valueOf(getTrafficClass()); + } + if (option == ALLOW_HALF_CLOSURE) { + return (T) Boolean.valueOf(isAllowHalfClosure()); + } + if (option == EpollChannelOption.TCP_CORK) { + return (T) Boolean.valueOf(isTcpCork()); + } + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == SO_SNDBUF) { + setSendBufferSize((Integer) value); + } else if (option == TCP_NODELAY) { + setTcpNoDelay((Boolean) value); + } else if (option == SO_KEEPALIVE) { + setKeepAlive((Boolean) value); + } else if (option == SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == SO_LINGER) { + setSoLinger((Integer) value); + } else if (option == IP_TOS) { + setTrafficClass((Integer) value); + } else if (option == ALLOW_HALF_CLOSURE) { + setAllowHalfClosure((Boolean) value); + } else if (option == EpollChannelOption.TCP_CORK) { + setTcpCork((Boolean) value); + } else { + return super.setOption(option, value); + } + + return true; + } + + @Override + public int getReceiveBufferSize() { + return Native.getReceiveBufferSize(channel.fd); + } + + @Override + public int getSendBufferSize() { + return Native.getSendBufferSize(channel.fd); + } + + @Override + public int getSoLinger() { + return Native.getSoLinger(channel.fd); + } + + @Override + public int getTrafficClass() { + return Native.getTrafficClass(channel.fd); + } + + @Override + public boolean isKeepAlive() { + return Native.isKeepAlive(channel.fd) == 1; + } + + @Override + public boolean isReuseAddress() { + return Native.isReuseAddress(channel.fd) == 1; + } + + @Override + public boolean isTcpNoDelay() { + return Native.isTcpNoDelay(channel.fd) == 1; + } + + public boolean isTcpCork() { + return Native.isTcpCork(channel.fd) == 1; + } + + @Override + public EpollSocketChannelConfig setKeepAlive(boolean keepAlive) { + Native.setKeepAlive(channel.fd, keepAlive ? 1 : 0); + return this; + } + + @Override + public EpollSocketChannelConfig setPerformancePreferences( + int connectionTime, int latency, int bandwidth) { + return this; + } + + @Override + public EpollSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { + Native.setReceiveBufferSize(channel.fd, receiveBufferSize); + return this; + } + + @Override + public EpollSocketChannelConfig setReuseAddress(boolean reuseAddress) { + Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0); + return this; + } + + @Override + public EpollSocketChannelConfig setSendBufferSize(int sendBufferSize) { + Native.setSendBufferSize(channel.fd, sendBufferSize); + return this; + } + + @Override + public EpollSocketChannelConfig setSoLinger(int soLinger) { + Native.setSoLinger(channel.fd, soLinger); + return this; + } + + @Override + public EpollSocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) { + Native.setTcpNoDelay(channel.fd, tcpNoDelay ? 1 : 0); + return this; + } + + public EpollSocketChannelConfig setTcpCork(boolean tcpCork) { + Native.setTcpCork(channel.fd, tcpCork ? 1 : 0); + return this; + } + + @Override + public EpollSocketChannelConfig setTrafficClass(int trafficClass) { + Native.setTrafficClass(channel.fd, trafficClass); + return this; + } + + @Override + public boolean isAllowHalfClosure() { + return allowHalfClosure; + } + + @Override + public EpollSocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure) { + this.allowHalfClosure = allowHalfClosure; + return this; + } + + @Override + public EpollSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + super.setConnectTimeoutMillis(connectTimeoutMillis); + return this; + } + + @Override + public EpollSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + super.setMaxMessagesPerRead(maxMessagesPerRead); + return this; + } + + @Override + public EpollSocketChannelConfig setWriteSpinCount(int writeSpinCount) { + super.setWriteSpinCount(writeSpinCount); + return this; + } + + @Override + public EpollSocketChannelConfig setAllocator(ByteBufAllocator allocator) { + super.setAllocator(allocator); + return this; + } + + @Override + public EpollSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + super.setRecvByteBufAllocator(allocator); + return this; + } + + @Override + public EpollSocketChannelConfig setAutoRead(boolean autoRead) { + super.setAutoRead(autoRead); + return this; + } + + @Override + public EpollSocketChannelConfig setAutoClose(boolean autoClose) { + super.setAutoClose(autoClose); + return this; + } + + @Override + public EpollSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); + return this; + } + + @Override + public EpollSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); + return this; + } + + @Override + public EpollSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + super.setMessageSizeEstimator(estimator); + return this; + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java new file mode 100644 index 0000000000..a45ea67b3a --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -0,0 +1,136 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + + +import io.netty.channel.DefaultFileRegion; +import io.netty.util.internal.NativeLibraryLoader; + +import java.io.IOException; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.Locale; + +/** + * Native helper methods + * + * Internal usage only! + */ +final class Native { + private static final byte[] IPV4_MAPPED_IPV6_PREFIX = { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff }; + + static { + String name = System.getProperty("os.name").toLowerCase(Locale.UK).trim(); + if (!name.startsWith("linux")) { + throw new IllegalStateException("Only supported on Linux"); + } + NativeLibraryLoader.load("netty-transport-native-epoll", Native.class.getClassLoader()); + } + + // EventLoop operations and constants + public static final int EPOLLIN = 0x01; + public static final int EPOLLOUT = 0x02; + public static final int EPOLLACCEPT = 0x04; + public static final int EPOLLRDHUP = 0x08; + + public static native int eventFd(); + public static native void eventFdWrite(int fd, long value); + public static native void eventFdRead(int fd); + public static native int epollCreate(); + public static native int epollWait(int efd, long[] events, int timeout); + public static native void epollCtlAdd(int efd, final int fd, final int flags, final int id); + public static native void epollCtlMod(int efd, final int fd, final int flags, final int id); + public static native void epollCtlDel(int efd, final int fd); + + // File-descriptor operations + public static native void close(int fd) throws IOException; + public static native int write(int fd, ByteBuffer buf, int pos, int limit) throws IOException; + public static native long writev(int fd, ByteBuffer[] buffers, int offset, int length) throws IOException; + public static native int read(int fd, ByteBuffer buf, int pos, int limit) throws IOException; + public static native long sendfile(int dest, DefaultFileRegion src, long offset, long length) throws IOException; + + // socket operations + public static native int socket() throws IOException; + public static void bind(int fd, InetAddress addr, int port) throws IOException { + byte[] address; + int scopeId; + if (addr instanceof Inet6Address) { + address = addr.getAddress(); + scopeId = ((Inet6Address) addr).getScopeId(); + } else { + // convert to ipv4 mapped ipv6 address; + scopeId = 0; + address = ipv4MappedIpv6Address(addr.getAddress()); + } + bind(fd, address, scopeId, port); + } + + private static byte[] ipv4MappedIpv6Address(byte[] ipv4) { + byte[] address = new byte[16]; + System.arraycopy(IPV4_MAPPED_IPV6_PREFIX, 0, address, 0, IPV4_MAPPED_IPV6_PREFIX.length); + System.arraycopy(ipv4, 0, address, 12, ipv4.length); + return address; + } + + public static native void bind(int fd, byte[] address, int scopeId, int port) throws IOException; + public static native void listen(int fd, int backlog) throws IOException; + public static boolean connect(int fd, InetAddress addr, int port) throws IOException { + byte[] address; + int scopeId; + if (addr instanceof Inet6Address) { + address = addr.getAddress(); + scopeId = ((Inet6Address) addr).getScopeId(); + } else { + // convert to ipv4 mapped ipv6 address; + scopeId = 0; + address = ipv4MappedIpv6Address(addr.getAddress()); + } + return connect(fd, address, scopeId, port); + } + public static native boolean connect(int fd, byte[] address, int scopeId, int port) throws IOException; + public static native void finishConnect(int fd) throws IOException; + + public static native InetSocketAddress remoteAddress(int fd); + public static native InetSocketAddress localAddress(int fd); + public static native int accept(int fd) throws IOException; + public static native void shutdown(int fd, boolean read, boolean write) throws IOException; + + // Socket option operations + public static native int getReceiveBufferSize(int fd); + public static native int getSendBufferSize(int fd); + public static native int isKeepAlive(int fd); + public static native int isReuseAddress(int fd); + public static native int isTcpNoDelay(int fd); + public static native int isTcpCork(int fd); + public static native int getSoLinger(int fd); + public static native int getTrafficClass(int fd); + + public static native void setKeepAlive(int fd, int keepAlive); + public static native void setReceiveBufferSize(int fd, int receiveBufferSize); + public static native void setReuseAddress(int fd, int reuseAddress); + public static native void setSendBufferSize(int fd, int sendBufferSize); + public static native void setTcpNoDelay(int fd, int tcpNoDelay); + public static native void setTcpCork(int fd, int tcpCork); + public static native void setSoLinger(int fd, int soLinger); + public static native void setTrafficClass(int fd, int tcpNoDelay); + + private Native() { + // utility + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/package-info.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/package-info.java new file mode 100644 index 0000000000..338a666c53 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Optimized transport for linux which uses EPOLL Edge-Triggered Mode + * for maximal performance. + */ +package io.netty.channel.epoll; diff --git a/transport-native-epoll/src/main/native-package/m4/custom.m4 b/transport-native-epoll/src/main/native-package/m4/custom.m4 new file mode 100644 index 0000000000..c3ffe7d167 --- /dev/null +++ b/transport-native-epoll/src/main/native-package/m4/custom.m4 @@ -0,0 +1,47 @@ +dnl --------------------------------------------------------------------------- +dnl Copyright 2014 The Netty Project +dnl +dnl Licensed under the Apache License, Version 2.0 (the "License"); +dnl you may not use this file except in compliance with the License. +dnl You may obtain a copy of the License at +dnl +dnl http://www.apache.org/licenses/LICENSE-2.0 +dnl +dnl Unless required by applicable law or agreed to in writing, software +dnl distributed under the License is distributed on an "AS IS" BASIS, +dnl WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +dnl See the License for the specific language governing permissions and +dnl limitations under the License. +dnl --------------------------------------------------------------------------- + +AC_DEFUN([CUSTOM_M4_SETUP], +[ + AC_MSG_CHECKING(which arch to build for) + AC_ARG_WITH([arch], + [AS_HELP_STRING([--with-arch@<:@=ARCH@:>@], + [Build for the specified architecture. Pick from: i386, x86_64.])], + [ + AS_IF(test -n "$withval", [ + ARCH="$withval" + AC_MSG_RESULT([yes, archs: $ARCH]) + ]) + ],[ + ARCH="" + AC_MSG_RESULT([no]) + ]) + AS_IF(test "$ARCH" = "i386", [ + FLAGS="-m32" + ], test "ARCH" = "x86_64", [ + FLAGS="-m64" + ], [ + FLAGS="" + ]) + AS_IF(test -n "$FLAGS", [ + CFLAGS="$FLAGS $CFLAGS" + CXXFLAGS="$FLAGS $CXXFLAGS" + LDFLAGS="$FLAGS $ARCH $LDFLAGS" + AC_SUBST(CFLAGS) + AC_SUBST(CXXFLAGS) + AC_SUBST(LDFLAGS) + ]) +]) \ No newline at end of file diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java new file mode 100644 index 0000000000..d3c70a1178 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketEchoTest; + +import java.util.List; + +public class EpollSocketEchoTest extends SocketEchoTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java new file mode 100644 index 0000000000..cecd2960e9 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketFileRegionTest; + +import java.util.List; + +public class EpollSocketFileRegionTest extends SocketFileRegionTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java new file mode 100644 index 0000000000..16392bf284 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketFixedLengthEchoTest; + +import java.util.List; + +public class EpollSocketFixedLengthEchoTest extends SocketFixedLengthEchoTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java new file mode 100644 index 0000000000..ed5d09413d --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketGatheringWriteTest; + +import java.util.List; + +public class EpollSocketGatheringWriteTest extends SocketGatheringWriteTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java new file mode 100644 index 0000000000..b6305590ca --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketObjectEchoTest; + +import java.util.List; + +public class EpollSocketObjectEchoTest extends SocketObjectEchoTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java new file mode 100644 index 0000000000..24eccf4f19 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketSslEchoTest; + +import java.util.List; + +public class EpollSocketSslEchoTest extends SocketSslEchoTest { + + public EpollSocketSslEchoTest(boolean serverUsesDelegatedTaskExecutor, + boolean clientUsesDelegatedTaskExecutor, + boolean useChunkedWriteHandler, + boolean useCompositeByteBuf) { + super(serverUsesDelegatedTaskExecutor, clientUsesDelegatedTaskExecutor, + useChunkedWriteHandler, useCompositeByteBuf); + } + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java new file mode 100644 index 0000000000..c290c6a812 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketStartTlsTest; + +import java.util.List; + +public class EpollSocketStartTlsTest extends SocketStartTlsTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java new file mode 100644 index 0000000000..a8a7bb8889 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketStringEchoTest; + +import java.util.List; + +public class EpollSocketStringEchoTest extends SocketStringEchoTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTestUtils.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTestUtils.java new file mode 100644 index 0000000000..e2ba895693 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTestUtils.java @@ -0,0 +1,46 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.testsuite.transport.TestsuitePermutation; + +import java.util.Collections; +import java.util.List; + +final class EpollTestUtils { + private static final EventLoopGroup GROUP = new EpollEventLoopGroup(); + static List> newFactories() { + return Collections.>singletonList( + new TestsuitePermutation.BootstrapComboFactory() { + @Override + public ServerBootstrap newServerInstance() { + return new ServerBootstrap().group(GROUP).channel(EpollServerSocketChannel.class); + } + + @Override + public Bootstrap newClientInstance() { + return new Bootstrap().group(GROUP).channel(EpollSocketChannel.class); + } + }); + } + + private EpollTestUtils() { + // utility class + } +}