diff --git a/common/src/main/java/io/netty/util/internal/NativeLibraryLoader.java b/common/src/main/java/io/netty/util/internal/NativeLibraryLoader.java new file mode 100644 index 0000000000..b0b7244989 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/NativeLibraryLoader.java @@ -0,0 +1,149 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.internal; + +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.util.Locale; +import java.util.regex.Pattern; + +/** + * Helper class to load JNI resources. + * + */ +public final class NativeLibraryLoader { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(NativeLibraryLoader.class); + + private static final Pattern REPLACE = Pattern.compile("\\W+"); + private static final File WORKDIR; + + static { + String workdir = SystemPropertyUtil.get("io.netty.native.workdir"); + if (workdir != null) { + File f = new File(workdir); + if (!f.exists()) { + // ok to ignore as createTempFile will take care + //noinspection ResultOfMethodCallIgnored + f.mkdirs(); + } + + try { + f = f.getAbsoluteFile(); + } catch (Exception ignored) { + // Good to have an absolute path, but it's OK. + } + + WORKDIR = f; + logger.debug("-Dio.netty.netty.workdir: {}", WORKDIR); + } else { + WORKDIR = PlatformDependent.tmpdir(); + logger.debug("-Dio.netty.netty.workdir: {} (io.netty.tmpdir)", WORKDIR); + } + } + + /** + * Load the given library with the specified {@link java.lang.ClassLoader} + */ + public static void load(String name, ClassLoader loader) { + String libname = System.mapLibraryName(name); + String path = "META-INF/native/" + osIdentifier() + PlatformDependent.bitMode() + '/' + libname; + + URL url = loader.getResource(path); + if (url == null) { + // Fall back to normal loading of JNI stuff + System.loadLibrary(name); + } else { + int index = libname.lastIndexOf('.'); + String prefix = libname.substring(0, index); + String suffix = libname.substring(index, libname.length()); + InputStream in = null; + OutputStream out = null; + File tmpFile = null; + boolean loaded = false; + try { + tmpFile = File.createTempFile(prefix, suffix, WORKDIR); + in = url.openStream(); + out = new FileOutputStream(tmpFile); + + byte[] buffer = new byte[8192]; + int length; + while ((length = in.read(buffer)) > 0) { + out.write(buffer, 0, length); + } + out.flush(); + out.close(); + out = null; + + System.load(tmpFile.getPath()); + loaded = true; + } catch (Exception e) { + throw (UnsatisfiedLinkError) new UnsatisfiedLinkError( + "could not load a native library: " + name).initCause(e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException ignore) { + // ignore + } + } + if (out != null) { + try { + out.close(); + } catch (IOException ignore) { + // ignore + } + } + if (tmpFile != null) { + if (loaded) { + tmpFile.deleteOnExit(); + } else { + if (!tmpFile.delete()) { + tmpFile.deleteOnExit(); + } + } + } + } + } + } + + private static String osIdentifier() { + String name = SystemPropertyUtil.get("os.name", "unknown").toLowerCase(Locale.US).trim(); + if (name.startsWith("win")) { + return "windows"; + } + if (name.startsWith("mac os x")) { + return "osx"; + } + if (name.startsWith("linux")) { + return "linux"; + } + + return REPLACE.matcher(name).replaceAll("_"); + } + + private NativeLibraryLoader() { + // Utility + } +} diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index a414c578c0..2b2de647a3 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -21,6 +21,7 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.BufferedReader; +import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.lang.reflect.Field; @@ -74,6 +75,10 @@ public final class PlatformDependent { private static final boolean HAS_JAVASSIST = hasJavassist0(); + private static final File TMPDIR = tmpdir0(); + + private static final int BIT_MODE = bitMode0(); + static { if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.noPreferDirect: {}", !DIRECT_BUFFER_PREFERRED); @@ -153,6 +158,20 @@ public final class PlatformDependent { return HAS_JAVASSIST; } + /** + * Returns the temporary directory. + */ + public static File tmpdir() { + return TMPDIR; + } + + /** + * Returns the bit mode of the current VM (usually 32 or 64.) + */ + public static int bitMode() { + return BIT_MODE; + } + /** * Raises an exception bypassing compiler checks for checked exceptions. */ @@ -638,6 +657,130 @@ public final class PlatformDependent { } } + private static File tmpdir0() { + File f; + try { + f = toDirectory(SystemPropertyUtil.get("io.netty.tmpdir")); + if (f != null) { + logger.debug("-Dio.netty.tmpdir: {}", f); + return f; + } + + f = toDirectory(SystemPropertyUtil.get("java.io.tmpdir")); + if (f != null) { + logger.debug("-Dio.netty.tmpdir: {} (java.io.tmpdir)", f); + return f; + } + + // This shouldn't happen, but just in case .. + if (isWindows()) { + f = toDirectory(System.getenv("TEMP")); + if (f != null) { + logger.debug("-Dio.netty.tmpdir: {} (%TEMP%)", f); + return f; + } + + String userprofile = System.getenv("USERPROFILE"); + if (userprofile != null) { + f = toDirectory(userprofile + "\\AppData\\Local\\Temp"); + if (f != null) { + logger.debug("-Dio.netty.tmpdir: {} (%USERPROFILE%\\AppData\\Local\\Temp)", f); + return f; + } + + f = toDirectory(userprofile + "\\Local Settings\\Temp"); + if (f != null) { + logger.debug("-Dio.netty.tmpdir: {} (%USERPROFILE%\\Local Settings\\Temp)", f); + return f; + } + } + } else { + f = toDirectory(System.getenv("TMPDIR")); + if (f != null) { + logger.debug("-Dio.netty.tmpdir: {} ($TMPDIR)", f); + return f; + } + } + } catch (Exception ignored) { + // Environment variable inaccessible + } + + // Last resort. + if (isWindows()) { + f = new File("C:\\Windows\\Temp"); + } else { + f = new File("/tmp"); + } + + logger.warn("Failed to get the temporary directory; falling back to: {}", f); + return f; + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + private static File toDirectory(String path) { + if (path == null) { + return null; + } + + File f = new File(path); + if (!f.exists()) { + f.mkdirs(); + } + + if (!f.isDirectory()) { + return null; + } + + try { + return f.getAbsoluteFile(); + } catch (Exception ignored) { + return f; + } + } + + private static int bitMode0() { + // Check user-specified bit mode first. + int bitMode = SystemPropertyUtil.getInt("io.netty.bitMode", 0); + if (bitMode > 0) { + logger.debug("-Dio.netty.bitMode: {}", bitMode); + return bitMode; + } + + // And then the vendor specific ones which is probably most reliable. + bitMode = SystemPropertyUtil.getInt("sun.arch.data.model", 0); + if (bitMode > 0) { + logger.debug("-Dio.netty.bitMode: {} (sun.arch.data.model)", bitMode); + return bitMode; + } + bitMode = SystemPropertyUtil.getInt("com.ibm.vm.bitmode", 0); + if (bitMode > 0) { + logger.debug("-Dio.netty.bitMode: {} (com.ibm.vm.bitmode)", bitMode); + return bitMode; + } + + // os.arch also gives us a good hint. + String arch = SystemPropertyUtil.get("os.arch", "").toLowerCase(Locale.US).trim(); + if ("amd64".equals(arch) || "x86_64".equals(arch)) { + bitMode = 64; + } else if ("i386".equals(arch) || "i486".equals(arch) || "i586".equals(arch) || "i686".equals(arch)) { + bitMode = 32; + } + + if (bitMode > 0) { + logger.debug("-Dio.netty.bitMode: {} (os.arch: {})", bitMode, arch); + } + + // Last resort: guess from VM name and then fall back to most common 64-bit mode. + String vm = SystemPropertyUtil.get("java.vm.name", "").toLowerCase(Locale.US); + Pattern BIT_PATTERN = Pattern.compile("([1-9][0-9]+)-?bit"); + Matcher m = BIT_PATTERN.matcher(vm); + if (m.find()) { + return Integer.parseInt(m.group(1)); + } else { + return 64; + } + } + private PlatformDependent() { // only static method supported } diff --git a/pom.xml b/pom.xml index 5db3ebe132..67944cfc14 100644 --- a/pom.xml +++ b/pom.xml @@ -120,6 +120,17 @@ false + + linux-native + + + linux + + + + transport-native-epoll + + @@ -498,6 +509,10 @@ manifest + + jar + bundle + ${project.groupId}.* @@ -564,7 +579,7 @@ 2.4.2 false - -P release,sonatype-oss-release,full,no-osgi + -P release,sonatype-oss-release,full,no-osgi,linux-native true false netty-@{project.version} diff --git a/transport-native-epoll/README.md b/transport-native-epoll/README.md new file mode 100644 index 0000000000..43f97ee933 --- /dev/null +++ b/transport-native-epoll/README.md @@ -0,0 +1,3 @@ +# Native transport for Linux + +See [our wiki page](http://netty.io/wiki/native-transports.html). diff --git a/transport-native-epoll/pom.xml b/transport-native-epoll/pom.xml new file mode 100644 index 0000000000..832e1bb99c --- /dev/null +++ b/transport-native-epoll/pom.xml @@ -0,0 +1,113 @@ + + + + 4.0.0 + + io.netty + netty-parent + 4.0.16.Final-SNAPSHOT + + netty-transport-native-epoll + + Netty/Transport/Native/Epoll + jar + + + + io.netty + netty-common + ${project.version} + + + io.netty + netty-buffer + ${project.version} + + + io.netty + netty-transport + ${project.version} + + + io.netty + netty-testsuite + ${project.version} + test-jar + test + + + junit + junit + + + + + + + org.fusesource.hawtjni + maven-hawtjni-plugin + 1.10 + + + build-linux64 + + ${project.artifactId} + ${project.build.directory}/linux64 + ${nativeSourceDirectory} + ${libDirectory} + + --with-arch=x86_64 + + linux64 + true + true + + + generate + build + + compile + + + build-linux32 + + ${project.build.directory}/linux32 + ${nativeSourceDirectory} + ${libDirectory} + ${project.artifactId} + + --with-arch=i386 + + linux32 + true + true + + + generate + build + + compile + + + + + + + ${basedir}/src/main/c + ${basedir}/target/classes/ + + diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c new file mode 100644 index 0000000000..99c77e260d --- /dev/null +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -0,0 +1,859 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "io_netty_channel_epoll_Native.h" + + +// optional +extern int accept4(int sockFd, struct sockaddr *addr, socklen_t *addrlen, int flags) __attribute__((weak)); + +// Those are initialized in the init(...) method and cached for performance reasons +jmethodID posId = NULL; +jmethodID limitId = NULL; +jfieldID posFieldId = NULL; +jfieldID limitFieldId = NULL; +jfieldID fileChannelFieldId = NULL; +jfieldID transferedFieldId = NULL; +jfieldID fdFieldId = NULL; +jfieldID fileDescriptorFieldId = NULL; +jmethodID inetSocketAddrMethodId = NULL; +jclass runtimeExceptionClass = NULL; +jclass ioExceptionClass = NULL; +jclass closedChannelExceptionClass = NULL; +jmethodID closedChannelExceptionMethodId = NULL; +jclass inetSocketAddressClass = NULL; +static int socketType; + +// util methods +void throwRuntimeException(JNIEnv *env, char *message) { + (*env)->ThrowNew(env, runtimeExceptionClass, message); +} + +void throwIOException(JNIEnv *env, char *message) { + (*env)->ThrowNew(env, ioExceptionClass, message); +} + +void throwClosedChannelException(JNIEnv *env) { + jobject exception = (*env)->NewObject(env, closedChannelExceptionClass, closedChannelExceptionMethodId); + (*env)->Throw(env, exception); +} + +void throwOutOfMemoryError( JNIEnv *env, char *message) { + jclass exceptionClass = (*env)->FindClass(env, "java/lang/OutOfMemoryError"); + (*env)->ThrowNew(env, exceptionClass, message); +} + +char *exceptionMessage(char *msg, int error) { + char *err = strerror(error); + char *result = malloc(strlen(msg) + strlen(err) + 1); + strcpy(result, msg); + strcat(result, err); + return result; +} + +jint epollCtl(JNIEnv * env, jint efd, int op, jint fd, jint flags, jint id) { + uint32_t events = EPOLLET; + + if (flags & EPOLL_ACCEPT) { + events |= EPOLLIN; + } + if (flags & EPOLL_READ) { + events |= EPOLLIN | EPOLLRDHUP; + } + if (flags & EPOLL_WRITE) { + events |= EPOLLOUT; + } + + struct epoll_event ev = { + .events = events, + // encode the id into the events + .data.u64 = (((uint64_t) id) << 32L) + }; + + return epoll_ctl(efd, op, fd, &ev); +} + +jint getOption(JNIEnv *env, jint fd, int level, int optname, const void *optval, socklen_t optlen) { + int code; + code = getsockopt(fd, level, optname, &optval, &optlen); + if (code == 0) { + return 0; + } + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during getsockopt(...): ", err)); + return code; +} + +int setOption(JNIEnv *env, jint fd, int level, int optname, const void *optval, socklen_t len) { + int rc = setsockopt(fd, level, optname, optval, len); + if (rc < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during setsockopt(...): ", err)); + } + return rc; +} + +jobject createInetSocketAddress(JNIEnv * env, struct sockaddr_storage addr) { + char ipstr[INET6_ADDRSTRLEN]; + int port; + if (addr.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&addr; + port = ntohs(s->sin_port); + inet_ntop(AF_INET, &s->sin_addr, ipstr, sizeof ipstr); + } else { + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr; + port = ntohs(s->sin6_port); + inet_ntop(AF_INET6, &s->sin6_addr, ipstr, sizeof ipstr); + } + jobject socketAddr = (*env)->NewObject(env, inetSocketAddressClass, inetSocketAddrMethodId, ipstr, port); + return socketAddr; +} + +void init_sockaddr(JNIEnv * env, jbyteArray address, jint scopeId, jint jport, struct sockaddr_storage * addr) { + uint16_t port = htons((uint16_t) jport); + jbyte* addressBytes = (*env)->GetByteArrayElements(env, address, 0); + if (socketType == AF_INET6) { + struct sockaddr_in6* ip6addr = (struct sockaddr_in6 *) addr; + ip6addr->sin6_family = AF_INET6; + ip6addr->sin6_port = port; + + if (scopeId != 0) { + ip6addr->sin6_scope_id = (uint32_t) scopeId; + } + memcpy( &(ip6addr->sin6_addr.s6_addr), addressBytes, 16); + } else { + struct sockaddr_in* ipaddr = (struct sockaddr_in *) addr; + ipaddr->sin_family = AF_INET; + ipaddr->sin_port = port; + memcpy( &(ipaddr->sin_addr.s_addr), addressBytes + 12, 4); + } + + (*env)->ReleaseByteArrayElements(env, address, addressBytes, JNI_ABORT); +} + +static int socket_type() { + int fd = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0); + if (fd == -1) { + if (errno == EAFNOSUPPORT) { + return AF_INET; + } + return AF_INET6; + } else { + close(fd); + return AF_INET6; + } +} +// util methods end + +jint JNI_OnLoad(JavaVM* vm, void* reserved) { + JNIEnv* env; + if ((*vm)->GetEnv(vm, (void **) &env, JNI_VERSION_1_6) != JNI_OK) { + return JNI_ERR; + } else { + // cache classes that are used within other jni methods for performance reasons + jclass localClosedChannelExceptionClass = (*env)->FindClass(env, "java/nio/channels/ClosedChannelException"); + if (localClosedChannelExceptionClass == NULL) { + // pending exception... + return JNI_ERR; + } + closedChannelExceptionClass = (jclass) (*env)->NewGlobalRef(env, localClosedChannelExceptionClass); + if (closedChannelExceptionClass == NULL) { + // out-of-memory! + throwOutOfMemoryError(env, "Error allocating memory"); + return JNI_ERR; + } + closedChannelExceptionMethodId = (*env)->GetMethodID(env, closedChannelExceptionClass, "", "()V"); + if (closedChannelExceptionMethodId == NULL) { + throwRuntimeException(env, "Unable to obtain constructor of ClosedChannelException"); + return JNI_ERR; + } + jclass localRuntimeExceptionClass = (*env)->FindClass(env, "java/lang/RuntimeException"); + if (localRuntimeExceptionClass == NULL) { + // pending exception... + return JNI_ERR; + } + runtimeExceptionClass = (jclass) (*env)->NewGlobalRef(env, localRuntimeExceptionClass); + if (runtimeExceptionClass == NULL) { + // out-of-memory! + throwOutOfMemoryError(env, "Error allocating memory"); + return JNI_ERR; + } + + jclass localIoExceptionClass = (*env)->FindClass(env, "java/io/IOException"); + if (localIoExceptionClass == NULL) { + // pending exception... + return JNI_ERR; + } + ioExceptionClass = (jclass) (*env)->NewGlobalRef(env, localIoExceptionClass); + if (ioExceptionClass == NULL) { + // out-of-memory! + throwOutOfMemoryError(env, "Error allocating memory"); + return JNI_ERR; + } + + jclass localInetSocketAddressClass = (*env)->FindClass(env, "java/net/InetSocketAddress"); + if (localIoExceptionClass == NULL) { + // pending exception... + return JNI_ERR; + } + inetSocketAddressClass = (jclass) (*env)->NewGlobalRef(env, localInetSocketAddressClass); + if (inetSocketAddressClass == NULL) { + // out-of-memory! + throwOutOfMemoryError(env, "Error allocating memory"); + return JNI_ERR; + } + + void *mem = malloc(1); + if (mem == NULL) { + throwOutOfMemoryError(env, "Error allocating native buffer"); + return JNI_ERR; + } + jobject directBuffer = (*env)->NewDirectByteBuffer(env, mem, 1); + if (directBuffer == NULL) { + throwOutOfMemoryError(env, "Error allocating native buffer"); + return JNI_ERR; + } + + jclass cls = (*env)->GetObjectClass(env, directBuffer); + + // Get the method id for Buffer.position() and Buffer.limit(). These are used as fallback if + // it is not possible to obtain the position and limit using the fields directly. + posId = (*env)->GetMethodID(env, cls, "position", "()I"); + if (posId == NULL) { + // position method was not found.. something is wrong so bail out + throwRuntimeException(env, "Unable to find method ByteBuffer.position()"); + return JNI_ERR; + } + + limitId = (*env)->GetMethodID(env, cls, "limit", "()I"); + if (limitId == NULL) { + // limit method was not found.. something is wrong so bail out + throwRuntimeException(env, "Unable to find method ByteBuffer.limit()"); + return JNI_ERR; + } + + // Try to get the ids of the position and limit fields. We later then check if we was able + // to find them and if so use them get the position and limit of the buffer. This is + // much faster then call back into java via (*env)->CallIntMethod(...). + posFieldId = (*env)->GetFieldID(env, cls, "position", "I"); + if (posFieldId == NULL) { + // this is ok as we can still use the method so just clear the exception + (*env)->ExceptionClear(env); + } + limitFieldId = (*env)->GetFieldID(env, cls, "limit", "I"); + if (limitFieldId == NULL) { + // this is ok as we can still use the method so just clear the exception + (*env)->ExceptionClear(env); + } + jclass fileRegionCls = (*env)->FindClass(env, "io/netty/channel/DefaultFileRegion"); + if (fileRegionCls == NULL) { + // pending exception... + return JNI_ERR; + } + fileChannelFieldId = (*env)->GetFieldID(env, fileRegionCls, "file", "Ljava/nio/channels/FileChannel;"); + if (fileChannelFieldId == NULL) { + throwRuntimeException(env, "Unable to obtain FileChannel field for DefaultFileRegion"); + return JNI_ERR; + } + transferedFieldId = (*env)->GetFieldID(env, fileRegionCls, "transfered", "J"); + if (transferedFieldId == NULL) { + throwRuntimeException(env, "Unable to obtain transfered field for DefaultFileRegion"); + return JNI_ERR; + } + + jclass fileChannelCls = (*env)->FindClass(env, "sun/nio/ch/FileChannelImpl"); + if (fileChannelCls == NULL) { + // pending exception... + return JNI_ERR; + } + fileDescriptorFieldId = (*env)->GetFieldID(env, fileChannelCls, "fd", "Ljava/io/FileDescriptor;"); + if (fileDescriptorFieldId == NULL) { + throwRuntimeException(env, "Unable to obtain fd field for FileChannelImpl"); + return JNI_ERR; + } + + jclass fileDescriptorCls = (*env)->FindClass(env, "java/io/FileDescriptor"); + if (fileDescriptorCls == NULL) { + // pending exception... + return JNI_ERR; + } + fdFieldId = (*env)->GetFieldID(env, fileDescriptorCls, "fd", "I"); + if (fdFieldId == NULL) { + throwRuntimeException(env, "Unable to obtain fd field for FileDescriptor"); + return JNI_ERR; + } + + inetSocketAddrMethodId = (*env)->GetMethodID(env, inetSocketAddressClass, "", "(Ljava/lang/String;I)V"); + if (inetSocketAddrMethodId == NULL) { + throwRuntimeException(env, "Unable to obtain constructor of InetSocketAddress"); + return JNI_ERR; + } + socketType = socket_type(); + return JNI_VERSION_1_6; + } +} + +void JNI_OnUnload(JavaVM *vm, void *reserved) { + JNIEnv* env; + if ((*vm)->GetEnv(vm, (void **) &env, JNI_VERSION_1_6) != JNI_OK) { + // Something is wrong but nothing we can do about this :( + return; + } else { + // delete global references so the GC can collect them + if (runtimeExceptionClass != NULL) { + (*env)->DeleteGlobalRef(env, runtimeExceptionClass); + } + if (ioExceptionClass != NULL) { + (*env)->DeleteGlobalRef(env, ioExceptionClass); + } + if (closedChannelExceptionClass != NULL) { + (*env)->DeleteGlobalRef(env, closedChannelExceptionClass); + } + if (inetSocketAddressClass != NULL) { + (*env)->DeleteGlobalRef(env, inetSocketAddressClass); + } + } +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz) { + jint eventFD = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + + if (eventFD < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error creating eventFD(...): ", err)); + } + return eventFD; +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value) { + jint eventFD = eventfd_write(fd, (eventfd_t)value); + + if (eventFD < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error calling eventfd_write(...): ", err)); + } +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd) { + uint64_t eventfd_t; + + if (eventfd_read(fd, &eventfd_t) != 0) { + // something is serious wrong + throwRuntimeException(env, "Error calling eventfd_read(...)"); + } +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollCreate(JNIEnv * env, jclass clazz) { + jint efd = epoll_create1(EPOLL_CLOEXEC); + if (efd < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during epoll_create(...): ", err)); + } + return efd; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollWait(JNIEnv * env, jclass clazz, jint efd, jlongArray events, jint timeout) { + int len = (*env)->GetArrayLength(env, events); + struct epoll_event ev[len]; + int ready; + int err; + do { + ready = epoll_wait(efd, ev, len, timeout); + // was interrupted try again. + } while (ready == -1 && (( err = errno) == EINTR)); + + if (ready < 0) { + throwIOException(env, exceptionMessage("Error during epoll_wait(...): ", err)); + return -1; + } + if (ready == 0) { + // nothing ready for process + return 0; + } + + jboolean isCopy; + jlong *elements = (*env)->GetLongArrayElements(env, events, &isCopy); + if (elements == NULL) { + // No memory left ?!?!? + throwOutOfMemoryError(env, "Can't allocate memory"); + return -1; + } + int i; + for (i = 0; i < ready; i++) { + // store the ready ops and id + elements[i] = (jlong) ev[i].data.u64; + if (ev[i].events & EPOLLIN) { + elements[i] |= EPOLL_READ; + } + if (ev[i].events & EPOLLRDHUP) { + elements[i] |= EPOLL_RDHUP; + } + if (ev[i].events & EPOLLOUT) { + elements[i] |= EPOLL_WRITE; + } + } + jint mode; + // release again to prevent memory leak + if (isCopy) { + mode = 0; + } else { + // was just pinned so use JNI_ABORT to eliminate not needed copy. + mode = JNI_ABORT; + } + (*env)->ReleaseLongArrayElements(env, events, elements, mode); + + return ready; +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id) { + if (epollCtl(env, efd, EPOLL_CTL_ADD, fd, flags, id) < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during calling epoll_ctl(...): ", err)); + } +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id) { + if (epollCtl(env, efd, EPOLL_CTL_MOD, fd, flags, id) < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during calling epoll_ctl(...): ", err)); + } +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz, jint efd, jint fd) { + // Create an empty event to workaround a bug in older kernels which can not handle NULL. + struct epoll_event event = { 0 }; + if (epoll_ctl(efd, EPOLL_CTL_DEL, fd, &event) < 0) { + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during calling epoll_ctl(...): ", err)); + } +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) { + // TODO: We could also maybe pass the address in directly and so eliminate this call + // not sure if this would buy us much. So some testing is needed. + void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer); + if (buffer == NULL) { + throwRuntimeException(env, "Unable to access address of buffer"); + return -1; + } + ssize_t res; + int err; + do { + res = write(fd, buffer + pos, (size_t) (limit - pos)); + // keep on writing if it was interrupted + } while(res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + // network stack saturated... try again later + if (err == EAGAIN || err == EWOULDBLOCK) { + return 0; + } + if (err == EBADF) { + throwClosedChannelException(env); + return -1; + } + throwIOException(env, exceptionMessage("Error while write(...): ", err)); + return -1; + } + return (jint) res; +} + +JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) { + struct iovec iov[length]; + int i; + int iovidx = 0; + for (i = offset; i < length; i++) { + jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); + jint pos; + // Get the current position using the (*env)->GetIntField if possible and fallback + // to slower (*env)->CallIntMethod(...) if needed + if (posFieldId == NULL) { + pos = (*env)->CallIntMethod(env, bufObj, posId, NULL); + } else { + pos = (*env)->GetIntField(env, bufObj, posFieldId); + } + jint limit; + // Get the current limit using the (*env)->GetIntField if possible and fallback + // to slower (*env)->CallIntMethod(...) if needed + if (limitFieldId == NULL) { + limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL); + } else { + limit = (*env)->GetIntField(env, bufObj, limitFieldId); + } + void *buffer = (*env)->GetDirectBufferAddress(env, bufObj); + if (buffer == NULL) { + throwRuntimeException(env, "Unable to access address of buffer"); + return -1; + } + iov[iovidx].iov_base = buffer + pos; + iov[iovidx].iov_len = (size_t) (limit - pos); + iovidx++; + } + + ssize_t res; + int err; + do { + res = writev(fd, iov, length); + // keep on writing if it was interrupted + } while(res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + if (err == EAGAIN || err == EWOULDBLOCK) { + // network stack is saturated we will try again later + return 0; + } + if (err == EBADF) { + throwClosedChannelException(env); + return -1; + } + throwIOException(env, exceptionMessage("Error while write(...): ", err)); + return -1; + } + return res; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) { + // TODO: We could also maybe pass the address in directly and so eliminate this call + // not sure if this would buy us much. So some testing is needed. + void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer); + if (buffer == NULL) { + throwRuntimeException(env, "Unable to access address of buffer"); + return -1; + } + ssize_t res; + int err; + do { + res = read(fd, buffer + pos, (size_t) (limit - pos)); + // Keep on reading if we was interrupted + } while (res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + if (err == EAGAIN || err == EWOULDBLOCK) { + // Nothing left to read + return 0; + } + if (err == EBADF) { + throwClosedChannelException(env); + return -1; + } + throwIOException(env, exceptionMessage("Error while read(...): ", err)); + return -1; + } + + if (res == 0) { + // end-of-stream + return -1; + } + return (jint) res; +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd) { + if (close(fd) < 0) { + throwIOException(env, "Error closing file descriptor"); + } +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_shutdown(JNIEnv * env, jclass clazz, jint fd, jboolean read, jboolean write) { + int mode; + if (read && write) { + mode = SHUT_RDWR; + } else if (read) { + mode = SHUT_RD; + } else if (write) { + mode = SHUT_WR; + } + if (shutdown(fd, mode) < 0) { + throwIOException(env, "Error shutdown socket file descriptor"); + } +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, jclass clazz) { + // TODO: Maybe also respect -Djava.net.preferIPv4Stack=true + int fd = socket(socketType, SOCK_STREAM | SOCK_NONBLOCK, 0); + if (fd == -1) { + int err = errno; + throwIOException(env, exceptionMessage("Error creating socket: ", err)); + return -1; + } else if (socketType == AF_INET6){ + // Allow to listen /connect ipv4 and ipv6 + int optval = 0; + if (setOption(env, fd, IPPROTO_IPV6, IPV6_V6ONLY, &optval, sizeof(optval)) < 0) { + // Something went wrong so close the fd and return here. setOption(...) itself throws the exception already. + close(fd); + return -1; + } + } + return fd; +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_bind(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) { + struct sockaddr_storage addr; + init_sockaddr(env, address, scopeId, port, &addr); + + if(bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == -1){ + int err = errno; + throwIOException(env, exceptionMessage("Error during bind(...): ", err)); + } +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_listen(JNIEnv * env, jclass clazz, jint fd, jint backlog) { + if(listen(fd, backlog) == -1) { + int err = errno; + throwIOException(env, exceptionMessage("Error during listen(...): ", err)); + } +} + +JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_connect(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) { + struct sockaddr_storage addr; + init_sockaddr(env, address, scopeId, port, &addr); + + int res; + int err; + do { + res = connect(fd, (struct sockaddr *) &addr, sizeof(addr)); + } while (res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + if (err == EINPROGRESS) { + // connect not complete yet need to wait for EPOLLOUT event + return JNI_FALSE; + } + throwIOException(env, exceptionMessage("Unable to connect to remote host: ", err)); + + return JNI_FALSE; + } + return JNI_TRUE; +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_finishConnect(JNIEnv * env, jclass clazz, jint fd) { + // connect done, check for error + int optval; + int res = getOption(env, fd, SOL_SOCKET, SO_ERROR, &optval, sizeof(optval)); + if (res == 0) { + return; + } + throwIOException(env, exceptionMessage("Unable to connect to remote host: ", optval)); +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_accept(JNIEnv * env, jclass clazz, jint fd) { + jint socketFd; + int err; + + do { + if (accept4) { + socketFd = accept4(fd, NULL, 0, SOCK_NONBLOCK | SOCK_CLOEXEC); + } else { + socketFd = accept(fd, NULL, 0); + } + } while (socketFd == -1 && ((err = errno) == EINTR)); + + if (socketFd == -1) { + if (err == EAGAIN || err == EWOULDBLOCK) { + // Everything consumed so just return -1 here. + return -1; + } else { + throwIOException(env, exceptionMessage("Error during accept(...): ", err)); + return -1; + } + } + if (accept4) { + return socketFd; + } else { + // accept4 was not present so need two more sys-calls ... + if (fcntl(socketFd, F_SETFD, FD_CLOEXEC) == -1) { + throwIOException(env, exceptionMessage("Error during accept(...): ", err)); + return -1; + } + if (fcntl(socketFd, F_SETFL, O_NONBLOCK) == -1) { + throwIOException(env, exceptionMessage("Error during accept(...): ", err)); + return -1; + } + } + return socketFd; +} + +JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_sendfile(JNIEnv *env, jclass clazz, jint fd, jobject fileRegion, jlong off, jlong len) { + jobject fileChannel = (*env)->GetObjectField(env, fileRegion, fileChannelFieldId); + if (fileChannel == NULL) { + throwRuntimeException(env, "Unable to obtain FileChannel from FileRegion"); + return -1; + } + jobject fileDescriptor = (*env)->GetObjectField(env, fileChannel, fileDescriptorFieldId); + if (fileDescriptor == NULL) { + throwRuntimeException(env, "Unable to obtain FileDescriptor from FileChannel"); + return -1; + } + jint srcFd = (*env)->GetIntField(env, fileDescriptor, fdFieldId); + if (srcFd == -1) { + throwRuntimeException(env, "Unable to obtain the fd from the FileDescriptor"); + return -1; + } + ssize_t res; + off_t offset = off; + int err; + do { + res = sendfile(fd, srcFd, &offset, (size_t) len); + } while (res == -1 && ((err = errno) == EINTR)); + if (res < 0) { + if (err == EAGAIN) { + return 0; + } + throwIOException(env, exceptionMessage("Error during accept(...): ", err)); + return -1; + } + if (res > 0) { + // update the transfered field in DefaultFileRegion + (*env)->SetLongField(env, fileRegion, transferedFieldId, off + res); + } + + return res; +} + +JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_remoteAddress(JNIEnv * env, jclass clazz, jint fd) { + socklen_t len; + struct sockaddr_storage addr; + + len = sizeof addr; + if (getpeername(fd, (struct sockaddr*)&addr, &len) == -1) { + return NULL; + } + return createInetSocketAddress(env, addr); +} + +JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_localAddress(JNIEnv * env, jclass clazz, jint fd) { + socklen_t len; + struct sockaddr_storage addr; + + len = sizeof addr; + if (getsockname(fd, (struct sockaddr*)&addr, &len) == -1) { + return NULL; + } + return createInetSocketAddress(env, addr); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv * env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv *env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv *env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, SOL_TCP, TCP_CORK, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv *env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval)); +} + +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv *env, jclass clazz, jint fd, jint optval) { + struct linger solinger; + if (optval < 0) { + solinger.l_onoff = 0; + solinger.l_linger = 0; + } else { + solinger.l_onoff = 1; + solinger.l_linger = optval; + } + setOption(env, fd, SOL_SOCKET, SO_LINGER, &solinger, sizeof(solinger)); +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getReceiveBufferSize(JNIEnv * env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv *env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv *env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, SOL_TCP, TCP_CORK, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv *env, jclass clazz, jint fd) { + struct linger optval; + if (getOption(env, fd, SOL_SOCKET, SO_LINGER, &optval, sizeof(optval)) == -1) { + return -1; + } + if (optval.l_onoff == 0) { + return -1; + } else { + return optval.l_linger; + } +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv *env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h new file mode 100644 index 0000000000..3b93bb0c42 --- /dev/null +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -0,0 +1,60 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +#include + + +#define EPOLL_READ 0x01 +#define EPOLL_WRITE 0x02 +#define EPOLL_ACCEPT 0x04 +#define EPOLL_RDHUP 0x08 + +jint Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz); +void Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value); +void Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_epollCreate(JNIEnv * env, jclass clazz); +jint Java_io_netty_channel_epoll_Native_epollWait(JNIEnv * env, jclass clazz, jint efd, jlongArray events, jint timeout); +void Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id); +void Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id); +void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz, jint efd, jint fd); +jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); +jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length); +jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); +void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd); +void Java_io_netty_channel_epoll_Native_shutdown(JNIEnv * env, jclass clazz, jint fd, jboolean read, jboolean write); +jint Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, jclass clazz); +void Java_io_netty_channel_epoll_Native_bind(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port); +void Java_io_netty_channel_epoll_Native_listen(JNIEnv * env, jclass clazz, jint fd, jint backlog); +jboolean Java_io_netty_channel_epoll_Native_connect(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port); +void Java_io_netty_channel_epoll_Native_finishConnect(JNIEnv * env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_accept(JNIEnv * env, jclass clazz, jint fd); +jlong Java_io_netty_channel_epoll_Native_sendfile(JNIEnv *env, jclass clazz, jint fd, jobject fileRegion, jlong off, jlong len); +jobject Java_io_netty_channel_epoll_Native_remoteAddress(JNIEnv * env, jclass clazz, jint fd); +jobject Java_io_netty_channel_epoll_Native_localAddress(JNIEnv * env, jclass clazz, jint fd); +void Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv * env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv *env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv *env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv *env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv *env, jclass clazz, jint fd, jint optval); +jint Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_getReceiveBufferSize(JNIEnv * env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv *env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv *env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv *env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv *env, jclass clazz, jint fd); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java new file mode 100644 index 0000000000..c991ce0f62 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -0,0 +1,163 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.AbstractChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelMetadata; +import io.netty.channel.EventLoop; + +import java.io.IOException; +import java.net.InetSocketAddress; + +abstract class AbstractEpollChannel extends AbstractChannel { + private static final ChannelMetadata DATA = new ChannelMetadata(false); + private final int readFlag; + protected int flags; + protected volatile boolean active; + volatile int fd; + int id; + + AbstractEpollChannel(Channel parent, int fd, int flag) { + super(parent); + this.fd = fd; + readFlag = flag; + flags |= flag; + } + + AbstractEpollChannel(int flag) { + this(null, socketFd(), flag); + } + + private static int socketFd() { + try { + return Native.socket(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public boolean isActive() { + return active; + } + + @Override + public ChannelMetadata metadata() { + return DATA; + } + + @Override + protected void doClose() throws Exception { + active = false; + int fd = this.fd; + this.fd = -1; + Native.close(fd); + } + + @Override + public InetSocketAddress remoteAddress() { + return (InetSocketAddress) super.remoteAddress(); + } + + @Override + public InetSocketAddress localAddress() { + return (InetSocketAddress) super.localAddress(); + } + + @Override + protected void doDisconnect() throws Exception { + doClose(); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return loop instanceof EpollEventLoop; + } + + @Override + public boolean isOpen() { + return fd != -1; + } + + @Override + protected void doDeregister() throws Exception { + ((EpollEventLoop) eventLoop()).remove(this); + } + + @Override + protected void doBeginRead() throws Exception { + if ((flags & readFlag) == 0) { + flags |= readFlag; + ((EpollEventLoop) eventLoop()).modify(this); + } + } + + protected final void clearEpollIn() { + if ((flags & readFlag) != 0) { + flags = ~readFlag; + ((EpollEventLoop) eventLoop()).modify(this); + } + } + + @Override + protected void doRegister() throws Exception { + EpollEventLoop loop = (EpollEventLoop) eventLoop(); + loop.add(this); + } + + @Override + protected abstract AbstractEpollUnsafe newUnsafe(); + + protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { + + /** + * Called once EPOLLIN event is ready to be processed + */ + abstract void epollInReady(); + + /** + * Called once EPOLLRDHUP event is ready to be processed + */ + void epollRdHupReady() { + // NOOP + } + + @Override + protected void flush0() { + // Flush immediately only when there's no pending flush. + // If there's a pending flush operation, event loop will call forceFlush() later, + // and thus there's no need to call it now. + if (isFlushPending()) { + return; + } + super.flush0(); + } + + /** + * Called once a EPOLLOUT event is ready to be processed + */ + void epollOutReady() { + // directly call super.flush0() to force a flush now + super.flush0(); + } + + private boolean isFlushPending() { + return (flags & Native.EPOLLOUT) != 0; + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java new file mode 100644 index 0000000000..8931a83cb8 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java @@ -0,0 +1,28 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.ChannelOption; + +public final class EpollChannelOption extends ChannelOption { + + public static final ChannelOption TCP_CORK = valueOf("TCP_CORK"); + + @SuppressWarnings({ "unused", "deprecation" }) + private EpollChannelOption(String name) { + super(name); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java new file mode 100644 index 0000000000..705c1f0920 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -0,0 +1,356 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +/** + * {@link EventLoop} which uses epoll under the covers. Only works on Linux! + */ +final class EpollEventLoop extends SingleThreadEventLoop { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class); + private static final AtomicIntegerFieldUpdater WAKEN_UP_UPDATER; + + static { + AtomicIntegerFieldUpdater updater = + PlatformDependent.newAtomicIntegerFieldUpdater(EpollEventLoop.class, "wakenUp"); + if (updater == null) { + updater = AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp"); + } + WAKEN_UP_UPDATER = updater; + } + + private final int epollFd; + private final int eventFd; + private final Map ids = new HashMap(); + private final long[] events; + + private int id; + private int oldWakenUp; + private boolean overflown; + + @SuppressWarnings("unused") + private volatile int wakenUp; + private volatile int ioRatio = 50; + + EpollEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, int maxEvents) { + super(parent, threadFactory, false); + events = new long[maxEvents]; + boolean success = false; + int epollFd = -1; + int eventFd = -1; + try { + this.epollFd = epollFd = Native.epollCreate(); + this.eventFd = eventFd = Native.eventFd(); + Native.epollCtlAdd(epollFd, eventFd, Native.EPOLLIN, 0); + success = true; + } finally { + if (!success) { + if (epollFd != -1) { + try { + Native.close(epollFd); + } catch (Exception e) { + // ignore + } + } + if (eventFd != -1) { + try { + Native.close(eventFd); + } catch (Exception e) { + // ignore + } + } + } + } + } + + private int nextId() { + int id = this.id; + if (id == Integer.MAX_VALUE) { + overflown = true; + id = 0; + } + if (overflown) { + // the ids had an overflow before so we need to make sure the id is not in use atm before assign + // it. + for (;;) { + if (!ids.containsKey(++id)) { + this.id = id; + break; + } + } + } else { + this.id = ++id; + } + return id; + } + + @Override + protected void wakeup(boolean inEventLoop) { + if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) { + // write to the evfd which will then wake-up epoll_wait(...) + Native.eventFdWrite(eventFd, 1L); + } + } + + /** + * Register the given epoll with this {@link io.netty.channel.EventLoop}. + */ + void add(AbstractEpollChannel ch) { + assert inEventLoop(); + int id = nextId(); + Native.epollCtlAdd(epollFd, ch.fd, ch.flags, id); + ch.id = id; + ids.put(id, ch); + } + + /** + * The flags of the given epoll was modified so update the registration + */ + void modify(AbstractEpollChannel ch) { + assert inEventLoop(); + Native.epollCtlMod(epollFd, ch.fd, ch.flags, ch.id); + } + + /** + * Deregister the given epoll from this {@link io.netty.channel.EventLoop}. + */ + void remove(AbstractEpollChannel ch) { + assert inEventLoop(); + if (ids.remove(ch.id) != null && ch.isOpen()) { + // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically + // removed once the file-descriptor is closed. + Native.epollCtlDel(epollFd, ch.fd); + } + } + + @Override + protected Queue newTaskQueue() { + // This event loop never calls takeTask() + return new ConcurrentLinkedQueue(); + } + + /** + * Returns the percentage of the desired amount of time spent for I/O in the event loop. + */ + public int getIoRatio() { + return ioRatio; + } + + /** + * Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is + * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. + */ + public void setIoRatio(int ioRatio) { + if (ioRatio <= 0 || ioRatio > 100) { + throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); + } + this.ioRatio = ioRatio; + } + + private int epollWait() { + int selectCnt = 0; + long currentTimeNanos = System.nanoTime(); + long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); + for (;;) { + long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; + if (timeoutMillis <= 0) { + if (selectCnt == 0) { + int ready = Native.epollWait(epollFd, events, 0); + if (ready > 0) { + return ready; + } + } + break; + } + + int selectedKeys = Native.epollWait(epollFd, events, (int) timeoutMillis); + selectCnt ++; + + if (selectedKeys != 0 || oldWakenUp == 1 || wakenUp == 1 || hasTasks()) { + // Selected something, + // waken up by user, or + // the task queue has a pending task. + return selectedKeys; + } + currentTimeNanos = System.nanoTime(); + } + return 0; + } + + @Override + protected void run() { + for (;;) { + oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0); + try { + int ready; + if (hasTasks()) { + // Non blocking just return what is ready directly without block + ready = Native.epollWait(epollFd, events, 0); + } else { + ready = epollWait(); + + // 'wakenUp.compareAndSet(false, true)' is always evaluated + // before calling 'selector.wakeup()' to reduce the wake-up + // overhead. (Selector.wakeup() is an expensive operation.) + // + // However, there is a race condition in this approach. + // The race condition is triggered when 'wakenUp' is set to + // true too early. + // + // 'wakenUp' is set to true too early if: + // 1) Selector is waken up between 'wakenUp.set(false)' and + // 'selector.select(...)'. (BAD) + // 2) Selector is waken up between 'selector.select(...)' and + // 'if (wakenUp.get()) { ... }'. (OK) + // + // In the first case, 'wakenUp' is set to true and the + // following 'selector.select(...)' will wake up immediately. + // Until 'wakenUp' is set to false again in the next round, + // 'wakenUp.compareAndSet(false, true)' will fail, and therefore + // any attempt to wake up the Selector will fail, too, causing + // the following 'selector.select(...)' call to block + // unnecessarily. + // + // To fix this problem, we wake up the selector again if wakenUp + // is true immediately after selector.select(...). + // It is inefficient in that it wakes up the selector for both + // the first case (BAD - wake-up required) and the second case + // (OK - no wake-up required). + + if (wakenUp == 1) { + Native.eventFdWrite(eventFd, 1L); + } + } + + final int ioRatio = this.ioRatio; + if (ioRatio == 100) { + if (ready > 0) { + processReady(events, ready); + } + runAllTasks(); + } else { + final long ioStartTime = System.nanoTime(); + + if (ready > 0) { + processReady(events, ready); + } + + final long ioTime = System.nanoTime() - ioStartTime; + runAllTasks(ioTime * (100 - ioRatio) / ioRatio); + } + + if (isShuttingDown()) { + closeAll(); + if (confirmShutdown()) { + break; + } + } + } catch (Throwable t) { + logger.warn("Unexpected exception in the selector loop.", t); + + // Prevent possible consecutive immediate failures that lead to + // excessive CPU consumption. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore. + } + } + } + } + + private void closeAll() { + int ready = Native.epollWait(epollFd, events, 0); + Collection channels = new ArrayList(ready); + + for (int i = 0; i < ready; i++) { + final long ev = events[i]; + + int id = (int) (ev >> 32L); + AbstractEpollChannel ch = ids.get(id); + if (ch != null) { + channels.add(ids.get(id)); + } + } + + for (AbstractEpollChannel ch: channels) { + ch.unsafe().close(ch.unsafe().voidPromise()); + } + } + + private void processReady(long[] events, int ready) { + for (int i = 0; i < ready; i ++) { + final long ev = events[i]; + + int id = (int) (ev >> 32L); + if (id == 0) { + // consume wakeup event + Native.eventFdRead(eventFd); + } else { + boolean read = (ev & Native.EPOLLIN) != 0; + boolean write = (ev & Native.EPOLLOUT) != 0; + boolean close = (ev & Native.EPOLLRDHUP) != 0; + + AbstractEpollChannel ch = ids.get(id); + if (ch != null) { + AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe(); + if (write) { + // force flush of data as the epoll is writable again + unsafe.epollOutReady(); + } + if (read) { + // Something is ready to read, so consume it now + unsafe.epollInReady(); + } + if (close) { + unsafe.epollRdHupReady(); + } + } + } + } + } + + @Override + protected void cleanup() { + try { + Native.close(epollFd); + } catch (IOException e) { + logger.warn("Failed to close the epoll fd.", e); + } + try { + Native.close(eventFd); + } catch (IOException e) { + logger.warn("Failed to close the event fd.", e); + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java new file mode 100644 index 0000000000..f702417c5c --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java @@ -0,0 +1,73 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.util.concurrent.EventExecutor; + +import java.util.concurrent.ThreadFactory; + +/** + * {@link EventLoopGroup} which uses epoll under the covers. Because of this + * it only works on linux. + */ +public final class EpollEventLoopGroup extends MultithreadEventLoopGroup { + + /** + * Create a new instance using the default number of threads and the default {@link ThreadFactory}. + */ + public EpollEventLoopGroup() { + this(0); + } + + /** + * Create a new instance using the specified number of threads and the default {@link ThreadFactory}. + */ + public EpollEventLoopGroup(int nThreads) { + this(nThreads, null); + } + + /** + * Create a new instance using the specified number of threads and the given {@link ThreadFactory}. + */ + public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory) { + this(nThreads, threadFactory, 128); + } + + /** + * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given + * maximal amount of epoll events to handle per epollWait(...). + */ + public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) { + super(nThreads, threadFactory, maxEventsAtOnce); + } + + /** + * Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is + * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. + */ + public void setIoRatio(int ioRatio) { + for (EventExecutor e: children()) { + ((EpollEventLoop) e).setIoRatio(ioRatio); + } + } + + @Override + protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { + return new EpollEventLoop(this, threadFactory, (Integer) args[0]); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java new file mode 100644 index 0000000000..54a5369dfd --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -0,0 +1,124 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.ServerSocketChannelConfig; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +/** + * {@link ServerSocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for + * maximal performance. + */ +public final class EpollServerSocketChannel extends AbstractEpollChannel implements ServerSocketChannel { + + private final EpollServerSocketChannelConfig config; + private volatile InetSocketAddress local; + + public EpollServerSocketChannel() { + super(Native.EPOLLACCEPT); + config = new EpollServerSocketChannelConfig(this); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return loop instanceof EpollEventLoop; + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + InetSocketAddress addr = (InetSocketAddress) localAddress; + Native.bind(fd, addr.getAddress(), addr.getPort()); + local = addr; + Native.listen(fd, config.getBacklog()); + active = true; + } + + @Override + public ServerSocketChannelConfig config() { + return config; + } + + @Override + protected InetSocketAddress localAddress0() { + return local; + } + + @Override + protected InetSocketAddress remoteAddress0() { + return null; + } + + @Override + protected AbstractEpollUnsafe newUnsafe() { + return new EpollServerSocketUnsafe(); + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) { + throw new UnsupportedOperationException(); + } + + final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { + @Override + public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) { + // Connect not supported by ServerChannel implementations + channelPromise.setFailure(new UnsupportedOperationException()); + } + + @Override + void epollInReady() { + assert eventLoop().inEventLoop(); + try { + final ChannelPipeline pipeline = pipeline(); + Throwable exception = null; + try { + for (;;) { + int socketFd = Native.accept(fd); + if (socketFd == -1) { + // this means everything was handled for now + break; + } + try { + pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd)); + } catch (Throwable t) { + // keep on reading as we use epoll ET and need to consume everything from the socket + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(t); + } + } + } catch (Throwable t) { + exception = t; + } + pipeline.fireChannelReadComplete(); + + if (exception != null) { + pipeline.fireExceptionCaught(exception); + } + } finally { + if (!config().isAutoRead()) { + clearEpollIn(); + } + } + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java new file mode 100644 index 0000000000..791a65b46a --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java @@ -0,0 +1,176 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.ServerSocketChannelConfig; +import io.netty.util.NetUtil; + +import java.util.Map; + +import static io.netty.channel.ChannelOption.SO_BACKLOG; +import static io.netty.channel.ChannelOption.SO_RCVBUF; +import static io.netty.channel.ChannelOption.SO_REUSEADDR; + +final class EpollServerSocketChannelConfig extends DefaultChannelConfig + implements ServerSocketChannelConfig { + + private final EpollServerSocketChannel channel; + private volatile int backlog = NetUtil.SOMAXCONN; + + EpollServerSocketChannelConfig(EpollServerSocketChannel channel) { + super(channel); + this.channel = channel; + } + + @Override + public Map, Object> getOptions() { + return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); + } + + @SuppressWarnings("unchecked") + @Override + public T getOption(ChannelOption option) { + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == SO_BACKLOG) { + return (T) Integer.valueOf(getBacklog()); + } + + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == SO_BACKLOG) { + setBacklog((Integer) value); + } else { + return super.setOption(option, value); + } + + return true; + } + + @Override + public boolean isReuseAddress() { + return Native.isReuseAddress(channel.fd) == 1; + } + + @Override + public ServerSocketChannelConfig setReuseAddress(boolean reuseAddress) { + Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0); + return this; + } + + @Override + public int getReceiveBufferSize() { + return Native.getReceiveBufferSize(channel.fd); + } + + @Override + public ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { + Native.setReceiveBufferSize(channel.fd, receiveBufferSize); + + return this; + } + + @Override + public ServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + return this; + } + + @Override + public int getBacklog() { + return backlog; + } + + @Override + public ServerSocketChannelConfig setBacklog(int backlog) { + if (backlog < 0) { + throw new IllegalArgumentException("backlog: " + backlog); + } + this.backlog = backlog; + return this; + } + + @Override + public ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + super.setConnectTimeoutMillis(connectTimeoutMillis); + return this; + } + + @Override + public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + super.setMaxMessagesPerRead(maxMessagesPerRead); + return this; + } + + @Override + public ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) { + super.setWriteSpinCount(writeSpinCount); + return this; + } + + @Override + public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) { + super.setAllocator(allocator); + return this; + } + + @Override + public ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + super.setRecvByteBufAllocator(allocator); + return this; + } + + @Override + public ServerSocketChannelConfig setAutoRead(boolean autoRead) { + super.setAutoRead(autoRead); + return this; + } + + @Override + public ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); + return this; + } + + @Override + public ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); + return this; + } + + @Override + public ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + super.setMessageSizeEstimator(estimator); + return this; + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java new file mode 100644 index 0000000000..a9af87cfdf --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -0,0 +1,590 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.ConnectTimeoutException; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.EventLoop; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.ChannelInputShutdownEvent; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.util.internal.StringUtil; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for + * maximal performance. + */ +public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel { + private final EpollSocketChannelConfig config; + + /** + * The future of the current connection attempt. If not null, subsequent + * connection attempts will fail. + */ + private ChannelPromise connectPromise; + private ScheduledFuture connectTimeoutFuture; + private SocketAddress requestedRemoteAddress; + + private volatile boolean inputShutdown; + private volatile boolean outputShutdown; + + EpollSocketChannel(Channel parent, int fd) { + super(parent, fd, Native.EPOLLIN); + active = true; + config = new EpollSocketChannelConfig(this); + } + + public EpollSocketChannel() { + super(Native.EPOLLIN); + config = new EpollSocketChannelConfig(this); + } + + @Override + protected AbstractEpollUnsafe newUnsafe() { + return new EpollSocketUnsafe(); + } + + @Override + protected SocketAddress localAddress0() { + return Native.localAddress(fd); + } + + @Override + protected SocketAddress remoteAddress0() { + return Native.remoteAddress(fd); + } + + @Override + protected void doBind(SocketAddress local) throws Exception { + InetSocketAddress localAddress = (InetSocketAddress) local; + Native.bind(fd, localAddress.getAddress(), localAddress.getPort()); + } + + private void setEpollOut() { + if ((flags & Native.EPOLLOUT) == 0) { + flags |= Native.EPOLLOUT; + ((EpollEventLoop) eventLoop()).modify(this); + } + } + + private void clearEpollOut() { + if ((flags & Native.EPOLLOUT) != 0) { + flags = ~Native.EPOLLOUT; + ((EpollEventLoop) eventLoop()).modify(this); + } + } + + /** + * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. + * @param buf the {@link ByteBuf} from which the bytes should be written + * @return amount the amount of written bytes + */ + private int doWriteBytes(ByteBuf buf, int readable) throws Exception { + int readerIndex = buf.readerIndex(); + int localFlushedAmount; + if (buf.nioBufferCount() == 1) { + ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, readable); + localFlushedAmount = Native.write(fd, nioBuf, nioBuf.position(), nioBuf.limit()); + } else { + // backed by more then one buffer, do a gathering write... + ByteBuffer[] nioBufs = buf.nioBuffers(); + localFlushedAmount = (int) Native.writev(fd, nioBufs, 0, nioBufs.length); + } + if (localFlushedAmount > 0) { + buf.readerIndex(readerIndex + localFlushedAmount); + } + return localFlushedAmount; + } + + /** + * Write a {@link DefaultFileRegion} + * + * @param region the {@link DefaultFileRegion} from which the bytes should be written + * @return amount the amount of written bytes + */ + private long doWriteFileRegion(DefaultFileRegion region, long count) throws Exception { + return Native.sendfile(fd, region, region.transfered(), count); + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + for (;;) { + final int msgCount = in.size(); + + if (msgCount == 0) { + // Wrote all messages. + clearEpollOut(); + break; + } + // Do non-gathering write for a single buffer case. + if (msgCount > 1) { + // Ensure the pending writes are made of ByteBufs only. + ByteBuffer[] nioBuffers = in.nioBuffers(); + if (nioBuffers != null) { + + int nioBufferCnt = in.nioBufferCount(); + long expectedWrittenBytes = in.nioBufferSize(); + + long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt); + + if (localWrittenBytes < expectedWrittenBytes) { + int nioBufIndex = 0; + setEpollOut(); + + // Did not write all buffers completely. + // Release the fully written buffers and update the indexes of the partially written buffer. + for (int i = msgCount; i > 0; i --) { + final ByteBuf buf = (ByteBuf) in.current(); + final int readerIndex = buf.readerIndex(); + final int readableBytes = buf.writerIndex() - readerIndex; + + if (readableBytes < localWrittenBytes) { + nioBufIndex += buf.nioBufferCount(); + in.remove(); + localWrittenBytes -= readableBytes; + } else if (readableBytes > localWrittenBytes) { + + buf.readerIndex(readerIndex + (int) localWrittenBytes); + in.progress(localWrittenBytes); + + // update position in ByteBuffer as we not do this in the native methods + ByteBuffer bb = nioBuffers[nioBufIndex]; + bb.position(bb.position() + (int) localWrittenBytes); + break; + } else { // readable == writtenBytes + in.remove(); + break; + } + } + } else { + // Release all buffers + for (int i = msgCount; i > 0; i --) { + in.remove(); + } + } + // try again as a ChannelFuture may be notified in the meantime and triggered another flush + continue; + } + } + + Object msg = in.current(); + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + int readableBytes = buf.readableBytes(); + if (readableBytes == 0) { + in.remove(); + continue; + } + + int expected = buf.readableBytes(); + int localFlushedAmount = doWriteBytes(buf, expected); + in.progress(localFlushedAmount); + if (localFlushedAmount < expected) { + setEpollOut(); + break; + } + if (!buf.isReadable()) { + in.remove(); + } + + } else if (msg instanceof DefaultFileRegion) { + DefaultFileRegion region = (DefaultFileRegion) msg; + + long expected = region.count() - region.position(); + long localFlushedAmount = doWriteFileRegion(region, expected); + in.progress(localFlushedAmount); + + if (localFlushedAmount < expected) { + setEpollOut(); + break; + } + + if (region.transfered() >= region.count()) { + in.remove(); + } + } else { + throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); + } + } + } + + @Override + public EpollSocketChannelConfig config() { + return config; + } + + @Override + public boolean isInputShutdown() { + return inputShutdown; + } + + @Override + public boolean isOutputShutdown() { + return outputShutdown || !isActive(); + } + + @Override + public ChannelFuture shutdownOutput() { + return shutdownOutput(newPromise()); + } + + @Override + public ChannelFuture shutdownOutput(final ChannelPromise promise) { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + try { + Native.shutdown(fd, false, true); + outputShutdown = true; + promise.setSuccess(); + } catch (Throwable t) { + promise.setFailure(t); + } + } else { + loop.execute(new Runnable() { + @Override + public void run() { + shutdownOutput(promise); + } + }); + } + return promise; + } + + @Override + public ServerSocketChannel parent() { + return (ServerSocketChannel) super.parent(); + } + + final class EpollSocketUnsafe extends AbstractEpollUnsafe { + private RecvByteBufAllocator.Handle allocHandle; + + @Override + public void write(Object msg, ChannelPromise promise) { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (!buf.isDirect()) { + // We can only handle direct buffers so we need to copy if a non direct is + // passed to write. + int readable = buf.readableBytes(); + ByteBuf dst = alloc().directBuffer(readable); + dst.writeBytes(buf, buf.readerIndex(), readable); + + buf.release(); + msg = dst; + } + } + super.write(msg, promise); + } + + private void closeOnRead(ChannelPipeline pipeline) { + inputShutdown = true; + if (isOpen()) { + if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + clearEpollIn(); + pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } else { + close(voidPromise()); + } + } + } + + private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { + if (byteBuf != null) { + if (byteBuf.isReadable()) { + pipeline.fireChannelRead(byteBuf); + } else { + byteBuf.release(); + } + } + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(cause); + if (close || cause instanceof IOException) { + closeOnRead(pipeline); + return true; + } + return false; + } + + @Override + public void connect( + final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { + if (!promise.setUncancellable() || !ensureOpen(promise)) { + return; + } + + try { + if (connectPromise != null) { + throw new IllegalStateException("connection attempt already made"); + } + + boolean wasActive = isActive(); + if (doConnect((InetSocketAddress) remoteAddress, (InetSocketAddress) localAddress)) { + fulfillConnectPromise(promise, wasActive); + } else { + connectPromise = promise; + requestedRemoteAddress = remoteAddress; + + // Schedule connect timeout. + int connectTimeoutMillis = config().getConnectTimeoutMillis(); + if (connectTimeoutMillis > 0) { + connectTimeoutFuture = eventLoop().schedule(new Runnable() { + @Override + public void run() { + ChannelPromise connectPromise = EpollSocketChannel.this.connectPromise; + ConnectTimeoutException cause = + new ConnectTimeoutException("connection timed out: " + remoteAddress); + if (connectPromise != null && connectPromise.tryFailure(cause)) { + close(voidPromise()); + } + } + }, connectTimeoutMillis, TimeUnit.MILLISECONDS); + } + + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isCancelled()) { + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + close(voidPromise()); + } + } + }); + } + } catch (Throwable t) { + if (t instanceof ConnectException) { + Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress); + newT.setStackTrace(t.getStackTrace()); + t = newT; + } + closeIfClosed(); + promise.tryFailure(t); + } + } + + private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { + if (promise == null) { + // Closed via cancellation and the promise has been notified already. + return; + } + active = true; + + // trySuccess() will return false if a user cancelled the connection attempt. + boolean promiseSet = promise.trySuccess(); + + // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, + // because what happened is what happened. + if (!wasActive && isActive()) { + pipeline().fireChannelActive(); + } + + // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). + if (!promiseSet) { + close(voidPromise()); + } + } + + private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { + if (promise == null) { + // Closed via cancellation and the promise has been notified already. + } + + // Use tryFailure() instead of setFailure() to avoid the race against cancel(). + promise.tryFailure(cause); + closeIfClosed(); + } + + private void finishConnect() { + // Note this method is invoked by the event loop only if the connection attempt was + // neither cancelled nor timed out. + + assert eventLoop().inEventLoop(); + + try { + boolean wasActive = isActive(); + doFinishConnect(); + fulfillConnectPromise(connectPromise, wasActive); + } catch (Throwable t) { + if (t instanceof ConnectException) { + Throwable newT = new ConnectException(t.getMessage() + ": " + requestedRemoteAddress); + newT.setStackTrace(t.getStackTrace()); + t = newT; + } + + fulfillConnectPromise(connectPromise, t); + } finally { + // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used + // See https://github.com/netty/netty/issues/1770 + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + } + } + + @Override + void epollOutReady() { + if (connectPromise != null) { + // pending connect which is now complete so handle it. + finishConnect(); + } else { + super.epollOutReady(); + } + } + + /** + * Connect to the remote peer + */ + private boolean doConnect(InetSocketAddress remoteAddress, InetSocketAddress localAddress) throws Exception { + if (localAddress != null) { + Native.bind(fd, localAddress.getAddress(), localAddress.getPort()); + } + + boolean success = false; + try { + boolean connected = Native.connect(fd, remoteAddress.getAddress(), + remoteAddress.getPort()); + if (!connected) { + setEpollOut(); + } + success = true; + return connected; + } finally { + if (!success) { + doClose(); + } + } + } + + /** + * Finish the connect + */ + private void doFinishConnect() throws Exception { + Native.finishConnect(fd); + clearEpollOut(); + } + + /** + * Read bytes into the given {@link ByteBuf} and return the amount. + */ + private int doReadBytes(ByteBuf byteBuf) throws Exception { + ByteBuffer buf = byteBuf.internalNioBuffer(0, byteBuf.writableBytes()); + int localReadAmount = Native.read(fd, buf, buf.position(), buf.limit()); + if (localReadAmount > 0) { + byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); + } + return localReadAmount; + } + + @Override + void epollRdHupReady() { + closeOnRead(pipeline()); + } + + @Override + void epollInReady() { + final ChannelConfig config = config(); + final ChannelPipeline pipeline = pipeline(); + final ByteBufAllocator allocator = config.getAllocator(); + RecvByteBufAllocator.Handle allocHandle = this.allocHandle; + if (allocHandle == null) { + this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); + } + + ByteBuf byteBuf = null; + boolean close = false; + try { + int byteBufCapacity = allocHandle.guess(); + int totalReadAmount = 0; + for (;;) { + // we use a direct buffer here as the native implementations only be able + // to handle direct buffers. + byteBuf = allocator.directBuffer(byteBufCapacity); + int writable = byteBuf.writableBytes(); + int localReadAmount = doReadBytes(byteBuf); + if (localReadAmount <= 0) { + // not was read release the buffer + byteBuf.release(); + close = localReadAmount < 0; + break; + } + pipeline.fireChannelRead(byteBuf); + byteBuf = null; + + if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { + allocHandle.record(totalReadAmount); + + // Avoid overflow. + totalReadAmount = localReadAmount; + } else { + totalReadAmount += localReadAmount; + } + + if (localReadAmount < writable) { + // Read less than what the buffer can hold, + // which might mean we drained the recv buffer completely. + break; + } + } + + pipeline.fireChannelReadComplete(); + allocHandle.record(totalReadAmount); + + if (close) { + closeOnRead(pipeline); + close = false; + } + } catch (Throwable t) { + boolean closed = handleReadException(pipeline, byteBuf, t, close); + if (!closed) { + // trigger a read again as there may be something left to read and because of epoll ET we + // will not get notified again until we read everything from the socket + eventLoop().execute(new Runnable() { + @Override + public void run() { + epollInReady(); + } + }); + } + } finally { + if (!config.isAutoRead()) { + clearEpollIn(); + } + } + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java new file mode 100644 index 0000000000..cab8ac0e4b --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java @@ -0,0 +1,280 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.SocketChannelConfig; +import io.netty.util.internal.PlatformDependent; + +import java.util.Map; + +import static io.netty.channel.ChannelOption.*; + +public final class EpollSocketChannelConfig extends DefaultChannelConfig + implements SocketChannelConfig { + + protected final EpollSocketChannel channel; + private volatile boolean allowHalfClosure; + + /** + * Creates a new instance. + */ + EpollSocketChannelConfig(EpollSocketChannel channel) { + super(channel); + + this.channel = channel; + if (PlatformDependent.canEnableTcpNoDelayByDefault()) { + setTcpNoDelay(true); + } + } + + @Override + public Map, Object> getOptions() { + return getOptions( + super.getOptions(), + SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS, + ALLOW_HALF_CLOSURE, EpollChannelOption.TCP_CORK); + } + + @SuppressWarnings("unchecked") + @Override + public T getOption(ChannelOption option) { + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == SO_SNDBUF) { + return (T) Integer.valueOf(getSendBufferSize()); + } + if (option == TCP_NODELAY) { + return (T) Boolean.valueOf(isTcpNoDelay()); + } + if (option == SO_KEEPALIVE) { + return (T) Boolean.valueOf(isKeepAlive()); + } + if (option == SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == SO_LINGER) { + return (T) Integer.valueOf(getSoLinger()); + } + if (option == IP_TOS) { + return (T) Integer.valueOf(getTrafficClass()); + } + if (option == ALLOW_HALF_CLOSURE) { + return (T) Boolean.valueOf(isAllowHalfClosure()); + } + if (option == EpollChannelOption.TCP_CORK) { + return (T) Boolean.valueOf(isTcpCork()); + } + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == SO_SNDBUF) { + setSendBufferSize((Integer) value); + } else if (option == TCP_NODELAY) { + setTcpNoDelay((Boolean) value); + } else if (option == SO_KEEPALIVE) { + setKeepAlive((Boolean) value); + } else if (option == SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == SO_LINGER) { + setSoLinger((Integer) value); + } else if (option == IP_TOS) { + setTrafficClass((Integer) value); + } else if (option == ALLOW_HALF_CLOSURE) { + setAllowHalfClosure((Boolean) value); + } else if (option == EpollChannelOption.TCP_CORK) { + setTcpCork((Boolean) value); + } else { + return super.setOption(option, value); + } + + return true; + } + + @Override + public int getReceiveBufferSize() { + return Native.getReceiveBufferSize(channel.fd); + } + + @Override + public int getSendBufferSize() { + return Native.getSendBufferSize(channel.fd); + } + + @Override + public int getSoLinger() { + return Native.getSoLinger(channel.fd); + } + + @Override + public int getTrafficClass() { + return Native.getTrafficClass(channel.fd); + } + + @Override + public boolean isKeepAlive() { + return Native.isKeepAlive(channel.fd) == 1; + } + + @Override + public boolean isReuseAddress() { + return Native.isReuseAddress(channel.fd) == 1; + } + + @Override + public boolean isTcpNoDelay() { + return Native.isTcpNoDelay(channel.fd) == 1; + } + + public boolean isTcpCork() { + return Native.isTcpCork(channel.fd) == 1; + } + + @Override + public EpollSocketChannelConfig setKeepAlive(boolean keepAlive) { + Native.setKeepAlive(channel.fd, keepAlive ? 1 : 0); + return this; + } + + @Override + public EpollSocketChannelConfig setPerformancePreferences( + int connectionTime, int latency, int bandwidth) { + return this; + } + + @Override + public EpollSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { + Native.setReceiveBufferSize(channel.fd, receiveBufferSize); + return this; + } + + @Override + public EpollSocketChannelConfig setReuseAddress(boolean reuseAddress) { + Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0); + return this; + } + + @Override + public EpollSocketChannelConfig setSendBufferSize(int sendBufferSize) { + Native.setSendBufferSize(channel.fd, sendBufferSize); + return this; + } + + @Override + public EpollSocketChannelConfig setSoLinger(int soLinger) { + Native.setSoLinger(channel.fd, soLinger); + return this; + } + + @Override + public EpollSocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) { + Native.setTcpNoDelay(channel.fd, tcpNoDelay ? 1 : 0); + return this; + } + + public EpollSocketChannelConfig setTcpCork(boolean tcpCork) { + Native.setTcpCork(channel.fd, tcpCork ? 1 : 0); + return this; + } + + @Override + public EpollSocketChannelConfig setTrafficClass(int trafficClass) { + Native.setTrafficClass(channel.fd, trafficClass); + return this; + } + + @Override + public boolean isAllowHalfClosure() { + return allowHalfClosure; + } + + @Override + public EpollSocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure) { + this.allowHalfClosure = allowHalfClosure; + return this; + } + + @Override + public EpollSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + super.setConnectTimeoutMillis(connectTimeoutMillis); + return this; + } + + @Override + public EpollSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + super.setMaxMessagesPerRead(maxMessagesPerRead); + return this; + } + + @Override + public EpollSocketChannelConfig setWriteSpinCount(int writeSpinCount) { + super.setWriteSpinCount(writeSpinCount); + return this; + } + + @Override + public EpollSocketChannelConfig setAllocator(ByteBufAllocator allocator) { + super.setAllocator(allocator); + return this; + } + + @Override + public EpollSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + super.setRecvByteBufAllocator(allocator); + return this; + } + + @Override + public EpollSocketChannelConfig setAutoRead(boolean autoRead) { + super.setAutoRead(autoRead); + return this; + } + + @Override + public EpollSocketChannelConfig setAutoClose(boolean autoClose) { + super.setAutoClose(autoClose); + return this; + } + + @Override + public EpollSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); + return this; + } + + @Override + public EpollSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); + return this; + } + + @Override + public EpollSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + super.setMessageSizeEstimator(estimator); + return this; + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java new file mode 100644 index 0000000000..a45ea67b3a --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -0,0 +1,136 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + + +import io.netty.channel.DefaultFileRegion; +import io.netty.util.internal.NativeLibraryLoader; + +import java.io.IOException; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.Locale; + +/** + * Native helper methods + * + * Internal usage only! + */ +final class Native { + private static final byte[] IPV4_MAPPED_IPV6_PREFIX = { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff }; + + static { + String name = System.getProperty("os.name").toLowerCase(Locale.UK).trim(); + if (!name.startsWith("linux")) { + throw new IllegalStateException("Only supported on Linux"); + } + NativeLibraryLoader.load("netty-transport-native-epoll", Native.class.getClassLoader()); + } + + // EventLoop operations and constants + public static final int EPOLLIN = 0x01; + public static final int EPOLLOUT = 0x02; + public static final int EPOLLACCEPT = 0x04; + public static final int EPOLLRDHUP = 0x08; + + public static native int eventFd(); + public static native void eventFdWrite(int fd, long value); + public static native void eventFdRead(int fd); + public static native int epollCreate(); + public static native int epollWait(int efd, long[] events, int timeout); + public static native void epollCtlAdd(int efd, final int fd, final int flags, final int id); + public static native void epollCtlMod(int efd, final int fd, final int flags, final int id); + public static native void epollCtlDel(int efd, final int fd); + + // File-descriptor operations + public static native void close(int fd) throws IOException; + public static native int write(int fd, ByteBuffer buf, int pos, int limit) throws IOException; + public static native long writev(int fd, ByteBuffer[] buffers, int offset, int length) throws IOException; + public static native int read(int fd, ByteBuffer buf, int pos, int limit) throws IOException; + public static native long sendfile(int dest, DefaultFileRegion src, long offset, long length) throws IOException; + + // socket operations + public static native int socket() throws IOException; + public static void bind(int fd, InetAddress addr, int port) throws IOException { + byte[] address; + int scopeId; + if (addr instanceof Inet6Address) { + address = addr.getAddress(); + scopeId = ((Inet6Address) addr).getScopeId(); + } else { + // convert to ipv4 mapped ipv6 address; + scopeId = 0; + address = ipv4MappedIpv6Address(addr.getAddress()); + } + bind(fd, address, scopeId, port); + } + + private static byte[] ipv4MappedIpv6Address(byte[] ipv4) { + byte[] address = new byte[16]; + System.arraycopy(IPV4_MAPPED_IPV6_PREFIX, 0, address, 0, IPV4_MAPPED_IPV6_PREFIX.length); + System.arraycopy(ipv4, 0, address, 12, ipv4.length); + return address; + } + + public static native void bind(int fd, byte[] address, int scopeId, int port) throws IOException; + public static native void listen(int fd, int backlog) throws IOException; + public static boolean connect(int fd, InetAddress addr, int port) throws IOException { + byte[] address; + int scopeId; + if (addr instanceof Inet6Address) { + address = addr.getAddress(); + scopeId = ((Inet6Address) addr).getScopeId(); + } else { + // convert to ipv4 mapped ipv6 address; + scopeId = 0; + address = ipv4MappedIpv6Address(addr.getAddress()); + } + return connect(fd, address, scopeId, port); + } + public static native boolean connect(int fd, byte[] address, int scopeId, int port) throws IOException; + public static native void finishConnect(int fd) throws IOException; + + public static native InetSocketAddress remoteAddress(int fd); + public static native InetSocketAddress localAddress(int fd); + public static native int accept(int fd) throws IOException; + public static native void shutdown(int fd, boolean read, boolean write) throws IOException; + + // Socket option operations + public static native int getReceiveBufferSize(int fd); + public static native int getSendBufferSize(int fd); + public static native int isKeepAlive(int fd); + public static native int isReuseAddress(int fd); + public static native int isTcpNoDelay(int fd); + public static native int isTcpCork(int fd); + public static native int getSoLinger(int fd); + public static native int getTrafficClass(int fd); + + public static native void setKeepAlive(int fd, int keepAlive); + public static native void setReceiveBufferSize(int fd, int receiveBufferSize); + public static native void setReuseAddress(int fd, int reuseAddress); + public static native void setSendBufferSize(int fd, int sendBufferSize); + public static native void setTcpNoDelay(int fd, int tcpNoDelay); + public static native void setTcpCork(int fd, int tcpCork); + public static native void setSoLinger(int fd, int soLinger); + public static native void setTrafficClass(int fd, int tcpNoDelay); + + private Native() { + // utility + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/package-info.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/package-info.java new file mode 100644 index 0000000000..338a666c53 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Optimized transport for linux which uses EPOLL Edge-Triggered Mode + * for maximal performance. + */ +package io.netty.channel.epoll; diff --git a/transport-native-epoll/src/main/native-package/m4/custom.m4 b/transport-native-epoll/src/main/native-package/m4/custom.m4 new file mode 100644 index 0000000000..c3ffe7d167 --- /dev/null +++ b/transport-native-epoll/src/main/native-package/m4/custom.m4 @@ -0,0 +1,47 @@ +dnl --------------------------------------------------------------------------- +dnl Copyright 2014 The Netty Project +dnl +dnl Licensed under the Apache License, Version 2.0 (the "License"); +dnl you may not use this file except in compliance with the License. +dnl You may obtain a copy of the License at +dnl +dnl http://www.apache.org/licenses/LICENSE-2.0 +dnl +dnl Unless required by applicable law or agreed to in writing, software +dnl distributed under the License is distributed on an "AS IS" BASIS, +dnl WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +dnl See the License for the specific language governing permissions and +dnl limitations under the License. +dnl --------------------------------------------------------------------------- + +AC_DEFUN([CUSTOM_M4_SETUP], +[ + AC_MSG_CHECKING(which arch to build for) + AC_ARG_WITH([arch], + [AS_HELP_STRING([--with-arch@<:@=ARCH@:>@], + [Build for the specified architecture. Pick from: i386, x86_64.])], + [ + AS_IF(test -n "$withval", [ + ARCH="$withval" + AC_MSG_RESULT([yes, archs: $ARCH]) + ]) + ],[ + ARCH="" + AC_MSG_RESULT([no]) + ]) + AS_IF(test "$ARCH" = "i386", [ + FLAGS="-m32" + ], test "ARCH" = "x86_64", [ + FLAGS="-m64" + ], [ + FLAGS="" + ]) + AS_IF(test -n "$FLAGS", [ + CFLAGS="$FLAGS $CFLAGS" + CXXFLAGS="$FLAGS $CXXFLAGS" + LDFLAGS="$FLAGS $ARCH $LDFLAGS" + AC_SUBST(CFLAGS) + AC_SUBST(CXXFLAGS) + AC_SUBST(LDFLAGS) + ]) +]) \ No newline at end of file diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java new file mode 100644 index 0000000000..d3c70a1178 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketEchoTest; + +import java.util.List; + +public class EpollSocketEchoTest extends SocketEchoTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java new file mode 100644 index 0000000000..cecd2960e9 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketFileRegionTest; + +import java.util.List; + +public class EpollSocketFileRegionTest extends SocketFileRegionTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java new file mode 100644 index 0000000000..16392bf284 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketFixedLengthEchoTest; + +import java.util.List; + +public class EpollSocketFixedLengthEchoTest extends SocketFixedLengthEchoTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java new file mode 100644 index 0000000000..ed5d09413d --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketGatheringWriteTest; + +import java.util.List; + +public class EpollSocketGatheringWriteTest extends SocketGatheringWriteTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java new file mode 100644 index 0000000000..b6305590ca --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketObjectEchoTest; + +import java.util.List; + +public class EpollSocketObjectEchoTest extends SocketObjectEchoTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java new file mode 100644 index 0000000000..24eccf4f19 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketSslEchoTest; + +import java.util.List; + +public class EpollSocketSslEchoTest extends SocketSslEchoTest { + + public EpollSocketSslEchoTest(boolean serverUsesDelegatedTaskExecutor, + boolean clientUsesDelegatedTaskExecutor, + boolean useChunkedWriteHandler, + boolean useCompositeByteBuf) { + super(serverUsesDelegatedTaskExecutor, clientUsesDelegatedTaskExecutor, + useChunkedWriteHandler, useCompositeByteBuf); + } + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java new file mode 100644 index 0000000000..c290c6a812 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketStartTlsTest; + +import java.util.List; + +public class EpollSocketStartTlsTest extends SocketStartTlsTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java new file mode 100644 index 0000000000..a8a7bb8889 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketStringEchoTest; + +import java.util.List; + +public class EpollSocketStringEchoTest extends SocketStringEchoTest { + + @Override + protected List> newFactories() { + return EpollTestUtils.newFactories(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTestUtils.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTestUtils.java new file mode 100644 index 0000000000..e2ba895693 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTestUtils.java @@ -0,0 +1,46 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.testsuite.transport.TestsuitePermutation; + +import java.util.Collections; +import java.util.List; + +final class EpollTestUtils { + private static final EventLoopGroup GROUP = new EpollEventLoopGroup(); + static List> newFactories() { + return Collections.>singletonList( + new TestsuitePermutation.BootstrapComboFactory() { + @Override + public ServerBootstrap newServerInstance() { + return new ServerBootstrap().group(GROUP).channel(EpollServerSocketChannel.class); + } + + @Override + public Bootstrap newClientInstance() { + return new Bootstrap().group(GROUP).channel(EpollSocketChannel.class); + } + }); + } + + private EpollTestUtils() { + // utility class + } +}