Introduce a native transport for linux using epoll ET

This transport use JNI (C) to directly make use of epoll in Edge-Triggered mode for maximal performance on Linux. Beside this it also support using TCP_CORK and produce less GC then the NIO transport using JDK NIO.
It only builds on linux and skip the build if linux is not used. The transport produce a jar which contains all needed .so files for 32bit and 64 bit. The user only need to include the jar as dependency as usually
to make use of it and use the correct classes.

This includes also some cleanup of @trustin
This commit is contained in:
Norman Maurer 2014-02-15 22:26:36 +01:00
parent a0e74ff984
commit 9330172f80
27 changed files with 3638 additions and 1 deletions

View File

@ -0,0 +1,149 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.util.Locale;
import java.util.regex.Pattern;
/**
* Helper class to load JNI resources.
*
*/
public final class NativeLibraryLoader {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NativeLibraryLoader.class);
private static final Pattern REPLACE = Pattern.compile("\\W+");
private static final File WORKDIR;
static {
String workdir = SystemPropertyUtil.get("io.netty.native.workdir");
if (workdir != null) {
File f = new File(workdir);
if (!f.exists()) {
// ok to ignore as createTempFile will take care
//noinspection ResultOfMethodCallIgnored
f.mkdirs();
}
try {
f = f.getAbsoluteFile();
} catch (Exception ignored) {
// Good to have an absolute path, but it's OK.
}
WORKDIR = f;
logger.debug("-Dio.netty.netty.workdir: {}", WORKDIR);
} else {
WORKDIR = PlatformDependent.tmpdir();
logger.debug("-Dio.netty.netty.workdir: {} (io.netty.tmpdir)", WORKDIR);
}
}
/**
* Load the given library with the specified {@link java.lang.ClassLoader}
*/
public static void load(String name, ClassLoader loader) {
String libname = System.mapLibraryName(name);
String path = "META-INF/native/" + osIdentifier() + PlatformDependent.bitMode() + '/' + libname;
URL url = loader.getResource(path);
if (url == null) {
// Fall back to normal loading of JNI stuff
System.loadLibrary(name);
} else {
int index = libname.lastIndexOf('.');
String prefix = libname.substring(0, index);
String suffix = libname.substring(index, libname.length());
InputStream in = null;
OutputStream out = null;
File tmpFile = null;
boolean loaded = false;
try {
tmpFile = File.createTempFile(prefix, suffix, WORKDIR);
in = url.openStream();
out = new FileOutputStream(tmpFile);
byte[] buffer = new byte[8192];
int length;
while ((length = in.read(buffer)) > 0) {
out.write(buffer, 0, length);
}
out.flush();
out.close();
out = null;
System.load(tmpFile.getPath());
loaded = true;
} catch (Exception e) {
throw (UnsatisfiedLinkError) new UnsatisfiedLinkError(
"could not load a native library: " + name).initCause(e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException ignore) {
// ignore
}
}
if (out != null) {
try {
out.close();
} catch (IOException ignore) {
// ignore
}
}
if (tmpFile != null) {
if (loaded) {
tmpFile.deleteOnExit();
} else {
if (!tmpFile.delete()) {
tmpFile.deleteOnExit();
}
}
}
}
}
}
private static String osIdentifier() {
String name = SystemPropertyUtil.get("os.name", "unknown").toLowerCase(Locale.US).trim();
if (name.startsWith("win")) {
return "windows";
}
if (name.startsWith("mac os x")) {
return "osx";
}
if (name.startsWith("linux")) {
return "linux";
}
return REPLACE.matcher(name).replaceAll("_");
}
private NativeLibraryLoader() {
// Utility
}
}

View File

@ -21,6 +21,7 @@ import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
@ -74,6 +75,10 @@ public final class PlatformDependent {
private static final boolean HAS_JAVASSIST = hasJavassist0();
private static final File TMPDIR = tmpdir0();
private static final int BIT_MODE = bitMode0();
static {
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noPreferDirect: {}", !DIRECT_BUFFER_PREFERRED);
@ -153,6 +158,20 @@ public final class PlatformDependent {
return HAS_JAVASSIST;
}
/**
* Returns the temporary directory.
*/
public static File tmpdir() {
return TMPDIR;
}
/**
* Returns the bit mode of the current VM (usually 32 or 64.)
*/
public static int bitMode() {
return BIT_MODE;
}
/**
* Raises an exception bypassing compiler checks for checked exceptions.
*/
@ -638,6 +657,130 @@ public final class PlatformDependent {
}
}
private static File tmpdir0() {
File f;
try {
f = toDirectory(SystemPropertyUtil.get("io.netty.tmpdir"));
if (f != null) {
logger.debug("-Dio.netty.tmpdir: {}", f);
return f;
}
f = toDirectory(SystemPropertyUtil.get("java.io.tmpdir"));
if (f != null) {
logger.debug("-Dio.netty.tmpdir: {} (java.io.tmpdir)", f);
return f;
}
// This shouldn't happen, but just in case ..
if (isWindows()) {
f = toDirectory(System.getenv("TEMP"));
if (f != null) {
logger.debug("-Dio.netty.tmpdir: {} (%TEMP%)", f);
return f;
}
String userprofile = System.getenv("USERPROFILE");
if (userprofile != null) {
f = toDirectory(userprofile + "\\AppData\\Local\\Temp");
if (f != null) {
logger.debug("-Dio.netty.tmpdir: {} (%USERPROFILE%\\AppData\\Local\\Temp)", f);
return f;
}
f = toDirectory(userprofile + "\\Local Settings\\Temp");
if (f != null) {
logger.debug("-Dio.netty.tmpdir: {} (%USERPROFILE%\\Local Settings\\Temp)", f);
return f;
}
}
} else {
f = toDirectory(System.getenv("TMPDIR"));
if (f != null) {
logger.debug("-Dio.netty.tmpdir: {} ($TMPDIR)", f);
return f;
}
}
} catch (Exception ignored) {
// Environment variable inaccessible
}
// Last resort.
if (isWindows()) {
f = new File("C:\\Windows\\Temp");
} else {
f = new File("/tmp");
}
logger.warn("Failed to get the temporary directory; falling back to: {}", f);
return f;
}
@SuppressWarnings("ResultOfMethodCallIgnored")
private static File toDirectory(String path) {
if (path == null) {
return null;
}
File f = new File(path);
if (!f.exists()) {
f.mkdirs();
}
if (!f.isDirectory()) {
return null;
}
try {
return f.getAbsoluteFile();
} catch (Exception ignored) {
return f;
}
}
private static int bitMode0() {
// Check user-specified bit mode first.
int bitMode = SystemPropertyUtil.getInt("io.netty.bitMode", 0);
if (bitMode > 0) {
logger.debug("-Dio.netty.bitMode: {}", bitMode);
return bitMode;
}
// And then the vendor specific ones which is probably most reliable.
bitMode = SystemPropertyUtil.getInt("sun.arch.data.model", 0);
if (bitMode > 0) {
logger.debug("-Dio.netty.bitMode: {} (sun.arch.data.model)", bitMode);
return bitMode;
}
bitMode = SystemPropertyUtil.getInt("com.ibm.vm.bitmode", 0);
if (bitMode > 0) {
logger.debug("-Dio.netty.bitMode: {} (com.ibm.vm.bitmode)", bitMode);
return bitMode;
}
// os.arch also gives us a good hint.
String arch = SystemPropertyUtil.get("os.arch", "").toLowerCase(Locale.US).trim();
if ("amd64".equals(arch) || "x86_64".equals(arch)) {
bitMode = 64;
} else if ("i386".equals(arch) || "i486".equals(arch) || "i586".equals(arch) || "i686".equals(arch)) {
bitMode = 32;
}
if (bitMode > 0) {
logger.debug("-Dio.netty.bitMode: {} (os.arch: {})", bitMode, arch);
}
// Last resort: guess from VM name and then fall back to most common 64-bit mode.
String vm = SystemPropertyUtil.get("java.vm.name", "").toLowerCase(Locale.US);
Pattern BIT_PATTERN = Pattern.compile("([1-9][0-9]+)-?bit");
Matcher m = BIT_PATTERN.matcher(vm);
if (m.find()) {
return Integer.parseInt(m.group(1));
} else {
return 64;
}
}
private PlatformDependent() {
// only static method supported
}

17
pom.xml
View File

@ -120,6 +120,17 @@
<maven.javadoc.failOnError>false</maven.javadoc.failOnError>
</properties>
</profile>
<profile>
<id>linux-native</id>
<activation>
<os>
<family>linux</family>
</os>
</activation>
<modules>
<module>transport-native-epoll</module>
</modules>
</profile>
</profiles>
<properties>
@ -499,6 +510,10 @@
<goal>manifest</goal>
</goals>
<configuration>
<supportedProjectTypes>
<supportedProjectType>jar</supportedProjectType>
<supportedProjectType>bundle</supportedProjectType>
</supportedProjectTypes>
<instructions>
<Export-Package>${project.groupId}.*</Export-Package>
<!-- enforce JVM vendor package as optional -->
@ -565,7 +580,7 @@
<version>2.4.2</version>
<configuration>
<useReleaseProfile>false</useReleaseProfile>
<arguments>-P release,sonatype-oss-release,full,no-osgi</arguments>
<arguments>-P release,sonatype-oss-release,full,no-osgi,linux-native</arguments>
<autoVersionSubmodules>true</autoVersionSubmodules>
<allowTimestampedSnapshots>false</allowTimestampedSnapshots>
<tagNameFormat>netty-@{project.version}</tagNameFormat>

View File

@ -0,0 +1,3 @@
# Native transport for Linux
See [our wiki page](http://netty.io/wiki/native-transports.html).

View File

@ -0,0 +1,113 @@
<?xml version="1.0" encoding="ISO-8859-15"?>
<!--
~ Copyright 2014 The Netty Project
~
~ The Netty Project licenses this file to you under the Apache License,
~ version 2.0 (the "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at:
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.1.0.Alpha1-SNAPSHOT</version>
</parent>
<artifactId>netty-transport-native-epoll</artifactId>
<name>Netty/Transport/Native/Epoll</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-testsuite</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.fusesource.hawtjni</groupId>
<artifactId>maven-hawtjni-plugin</artifactId>
<version>1.10</version>
<executions>
<execution>
<id>build-linux64</id>
<configuration>
<name>${project.artifactId}</name>
<buildDirectory>${project.build.directory}/linux64</buildDirectory>
<nativeSourceDirectory>${nativeSourceDirectory}</nativeSourceDirectory>
<libDirectory>${libDirectory}</libDirectory>
<configureArgs>
<arg>--with-arch=x86_64</arg>
</configureArgs>
<platform>linux64</platform>
<forceConfigure>true</forceConfigure>
<forceAutogen>true</forceAutogen>
</configuration>
<goals>
<goal>generate</goal>
<goal>build</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>build-linux32</id>
<configuration>
<buildDirectory>${project.build.directory}/linux32</buildDirectory>
<nativeSourceDirectory>${nativeSourceDirectory}</nativeSourceDirectory>
<libDirectory>${libDirectory}</libDirectory>
<name>${project.artifactId}</name>
<configureArgs>
<arg>--with-arch=i386</arg>
</configureArgs>
<platform>linux32</platform>
<forceConfigure>true</forceConfigure>
<forceAutogen>true</forceAutogen>
</configuration>
<goals>
<goal>generate</goal>
<goal>build</goal>
</goals>
<phase>compile</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<nativeSourceDirectory>${basedir}/src/main/c</nativeSourceDirectory>
<libDirectory>${basedir}/target/classes/</libDirectory>
</properties>
</project>

View File

@ -0,0 +1,859 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
#include <jni.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/sendfile.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include "io_netty_channel_epoll_Native.h"
// optional
extern int accept4(int sockFd, struct sockaddr *addr, socklen_t *addrlen, int flags) __attribute__((weak));
// Those are initialized in the init(...) method and cached for performance reasons
jmethodID posId = NULL;
jmethodID limitId = NULL;
jfieldID posFieldId = NULL;
jfieldID limitFieldId = NULL;
jfieldID fileChannelFieldId = NULL;
jfieldID transferedFieldId = NULL;
jfieldID fdFieldId = NULL;
jfieldID fileDescriptorFieldId = NULL;
jmethodID inetSocketAddrMethodId = NULL;
jclass runtimeExceptionClass = NULL;
jclass ioExceptionClass = NULL;
jclass closedChannelExceptionClass = NULL;
jmethodID closedChannelExceptionMethodId = NULL;
jclass inetSocketAddressClass = NULL;
static int socketType;
// util methods
void throwRuntimeException(JNIEnv *env, char *message) {
(*env)->ThrowNew(env, runtimeExceptionClass, message);
}
void throwIOException(JNIEnv *env, char *message) {
(*env)->ThrowNew(env, ioExceptionClass, message);
}
void throwClosedChannelException(JNIEnv *env) {
jobject exception = (*env)->NewObject(env, closedChannelExceptionClass, closedChannelExceptionMethodId);
(*env)->Throw(env, exception);
}
void throwOutOfMemoryError( JNIEnv *env, char *message) {
jclass exceptionClass = (*env)->FindClass(env, "java/lang/OutOfMemoryError");
(*env)->ThrowNew(env, exceptionClass, message);
}
char *exceptionMessage(char *msg, int error) {
char *err = strerror(error);
char *result = malloc(strlen(msg) + strlen(err) + 1);
strcpy(result, msg);
strcat(result, err);
return result;
}
jint epollCtl(JNIEnv * env, jint efd, int op, jint fd, jint flags, jint id) {
uint32_t events = EPOLLET;
if (flags & EPOLL_ACCEPT) {
events |= EPOLLIN;
}
if (flags & EPOLL_READ) {
events |= EPOLLIN | EPOLLRDHUP;
}
if (flags & EPOLL_WRITE) {
events |= EPOLLOUT;
}
struct epoll_event ev = {
.events = events,
// encode the id into the events
.data.u64 = (((uint64_t) id) << 32L)
};
return epoll_ctl(efd, op, fd, &ev);
}
jint getOption(JNIEnv *env, jint fd, int level, int optname, const void *optval, socklen_t optlen) {
int code;
code = getsockopt(fd, level, optname, &optval, &optlen);
if (code == 0) {
return 0;
}
int err = errno;
throwRuntimeException(env, exceptionMessage("Error during getsockopt(...): ", err));
return code;
}
int setOption(JNIEnv *env, jint fd, int level, int optname, const void *optval, socklen_t len) {
int rc = setsockopt(fd, level, optname, optval, len);
if (rc < 0) {
int err = errno;
throwRuntimeException(env, exceptionMessage("Error during setsockopt(...): ", err));
}
return rc;
}
jobject createInetSocketAddress(JNIEnv * env, struct sockaddr_storage addr) {
char ipstr[INET6_ADDRSTRLEN];
int port;
if (addr.ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&addr;
port = ntohs(s->sin_port);
inet_ntop(AF_INET, &s->sin_addr, ipstr, sizeof ipstr);
} else {
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
port = ntohs(s->sin6_port);
inet_ntop(AF_INET6, &s->sin6_addr, ipstr, sizeof ipstr);
}
jobject socketAddr = (*env)->NewObject(env, inetSocketAddressClass, inetSocketAddrMethodId, ipstr, port);
return socketAddr;
}
void init_sockaddr(JNIEnv * env, jbyteArray address, jint scopeId, jint jport, struct sockaddr_storage * addr) {
uint16_t port = htons((uint16_t) jport);
jbyte* addressBytes = (*env)->GetByteArrayElements(env, address, 0);
if (socketType == AF_INET6) {
struct sockaddr_in6* ip6addr = (struct sockaddr_in6 *) addr;
ip6addr->sin6_family = AF_INET6;
ip6addr->sin6_port = port;
if (scopeId != 0) {
ip6addr->sin6_scope_id = (uint32_t) scopeId;
}
memcpy( &(ip6addr->sin6_addr.s6_addr), addressBytes, 16);
} else {
struct sockaddr_in* ipaddr = (struct sockaddr_in *) addr;
ipaddr->sin_family = AF_INET;
ipaddr->sin_port = port;
memcpy( &(ipaddr->sin_addr.s_addr), addressBytes + 12, 4);
}
(*env)->ReleaseByteArrayElements(env, address, addressBytes, JNI_ABORT);
}
static int socket_type() {
int fd = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (fd == -1) {
if (errno == EAFNOSUPPORT) {
return AF_INET;
}
return AF_INET6;
} else {
close(fd);
return AF_INET6;
}
}
// util methods end
jint JNI_OnLoad(JavaVM* vm, void* reserved) {
JNIEnv* env;
if ((*vm)->GetEnv(vm, (void **) &env, JNI_VERSION_1_6) != JNI_OK) {
return JNI_ERR;
} else {
// cache classes that are used within other jni methods for performance reasons
jclass localClosedChannelExceptionClass = (*env)->FindClass(env, "java/nio/channels/ClosedChannelException");
if (localClosedChannelExceptionClass == NULL) {
// pending exception...
return JNI_ERR;
}
closedChannelExceptionClass = (jclass) (*env)->NewGlobalRef(env, localClosedChannelExceptionClass);
if (closedChannelExceptionClass == NULL) {
// out-of-memory!
throwOutOfMemoryError(env, "Error allocating memory");
return JNI_ERR;
}
closedChannelExceptionMethodId = (*env)->GetMethodID(env, closedChannelExceptionClass, "<init>", "()V");
if (closedChannelExceptionMethodId == NULL) {
throwRuntimeException(env, "Unable to obtain constructor of ClosedChannelException");
return JNI_ERR;
}
jclass localRuntimeExceptionClass = (*env)->FindClass(env, "java/lang/RuntimeException");
if (localRuntimeExceptionClass == NULL) {
// pending exception...
return JNI_ERR;
}
runtimeExceptionClass = (jclass) (*env)->NewGlobalRef(env, localRuntimeExceptionClass);
if (runtimeExceptionClass == NULL) {
// out-of-memory!
throwOutOfMemoryError(env, "Error allocating memory");
return JNI_ERR;
}
jclass localIoExceptionClass = (*env)->FindClass(env, "java/io/IOException");
if (localIoExceptionClass == NULL) {
// pending exception...
return JNI_ERR;
}
ioExceptionClass = (jclass) (*env)->NewGlobalRef(env, localIoExceptionClass);
if (ioExceptionClass == NULL) {
// out-of-memory!
throwOutOfMemoryError(env, "Error allocating memory");
return JNI_ERR;
}
jclass localInetSocketAddressClass = (*env)->FindClass(env, "java/net/InetSocketAddress");
if (localIoExceptionClass == NULL) {
// pending exception...
return JNI_ERR;
}
inetSocketAddressClass = (jclass) (*env)->NewGlobalRef(env, localInetSocketAddressClass);
if (inetSocketAddressClass == NULL) {
// out-of-memory!
throwOutOfMemoryError(env, "Error allocating memory");
return JNI_ERR;
}
void *mem = malloc(1);
if (mem == NULL) {
throwOutOfMemoryError(env, "Error allocating native buffer");
return JNI_ERR;
}
jobject directBuffer = (*env)->NewDirectByteBuffer(env, mem, 1);
if (directBuffer == NULL) {
throwOutOfMemoryError(env, "Error allocating native buffer");
return JNI_ERR;
}
jclass cls = (*env)->GetObjectClass(env, directBuffer);
// Get the method id for Buffer.position() and Buffer.limit(). These are used as fallback if
// it is not possible to obtain the position and limit using the fields directly.
posId = (*env)->GetMethodID(env, cls, "position", "()I");
if (posId == NULL) {
// position method was not found.. something is wrong so bail out
throwRuntimeException(env, "Unable to find method ByteBuffer.position()");
return JNI_ERR;
}
limitId = (*env)->GetMethodID(env, cls, "limit", "()I");
if (limitId == NULL) {
// limit method was not found.. something is wrong so bail out
throwRuntimeException(env, "Unable to find method ByteBuffer.limit()");
return JNI_ERR;
}
// Try to get the ids of the position and limit fields. We later then check if we was able
// to find them and if so use them get the position and limit of the buffer. This is
// much faster then call back into java via (*env)->CallIntMethod(...).
posFieldId = (*env)->GetFieldID(env, cls, "position", "I");
if (posFieldId == NULL) {
// this is ok as we can still use the method so just clear the exception
(*env)->ExceptionClear(env);
}
limitFieldId = (*env)->GetFieldID(env, cls, "limit", "I");
if (limitFieldId == NULL) {
// this is ok as we can still use the method so just clear the exception
(*env)->ExceptionClear(env);
}
jclass fileRegionCls = (*env)->FindClass(env, "io/netty/channel/DefaultFileRegion");
if (fileRegionCls == NULL) {
// pending exception...
return JNI_ERR;
}
fileChannelFieldId = (*env)->GetFieldID(env, fileRegionCls, "file", "Ljava/nio/channels/FileChannel;");
if (fileChannelFieldId == NULL) {
throwRuntimeException(env, "Unable to obtain FileChannel field for DefaultFileRegion");
return JNI_ERR;
}
transferedFieldId = (*env)->GetFieldID(env, fileRegionCls, "transfered", "J");
if (transferedFieldId == NULL) {
throwRuntimeException(env, "Unable to obtain transfered field for DefaultFileRegion");
return JNI_ERR;
}
jclass fileChannelCls = (*env)->FindClass(env, "sun/nio/ch/FileChannelImpl");
if (fileChannelCls == NULL) {
// pending exception...
return JNI_ERR;
}
fileDescriptorFieldId = (*env)->GetFieldID(env, fileChannelCls, "fd", "Ljava/io/FileDescriptor;");
if (fileDescriptorFieldId == NULL) {
throwRuntimeException(env, "Unable to obtain fd field for FileChannelImpl");
return JNI_ERR;
}
jclass fileDescriptorCls = (*env)->FindClass(env, "java/io/FileDescriptor");
if (fileDescriptorCls == NULL) {
// pending exception...
return JNI_ERR;
}
fdFieldId = (*env)->GetFieldID(env, fileDescriptorCls, "fd", "I");
if (fdFieldId == NULL) {
throwRuntimeException(env, "Unable to obtain fd field for FileDescriptor");
return JNI_ERR;
}
inetSocketAddrMethodId = (*env)->GetMethodID(env, inetSocketAddressClass, "<init>", "(Ljava/lang/String;I)V");
if (inetSocketAddrMethodId == NULL) {
throwRuntimeException(env, "Unable to obtain constructor of InetSocketAddress");
return JNI_ERR;
}
socketType = socket_type();
return JNI_VERSION_1_6;
}
}
void JNI_OnUnload(JavaVM *vm, void *reserved) {
JNIEnv* env;
if ((*vm)->GetEnv(vm, (void **) &env, JNI_VERSION_1_6) != JNI_OK) {
// Something is wrong but nothing we can do about this :(
return;
} else {
// delete global references so the GC can collect them
if (runtimeExceptionClass != NULL) {
(*env)->DeleteGlobalRef(env, runtimeExceptionClass);
}
if (ioExceptionClass != NULL) {
(*env)->DeleteGlobalRef(env, ioExceptionClass);
}
if (closedChannelExceptionClass != NULL) {
(*env)->DeleteGlobalRef(env, closedChannelExceptionClass);
}
if (inetSocketAddressClass != NULL) {
(*env)->DeleteGlobalRef(env, inetSocketAddressClass);
}
}
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz) {
jint eventFD = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (eventFD < 0) {
int err = errno;
throwRuntimeException(env, exceptionMessage("Error creating eventFD(...): ", err));
}
return eventFD;
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value) {
jint eventFD = eventfd_write(fd, (eventfd_t)value);
if (eventFD < 0) {
int err = errno;
throwRuntimeException(env, exceptionMessage("Error calling eventfd_write(...): ", err));
}
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd) {
uint64_t eventfd_t;
if (eventfd_read(fd, &eventfd_t) != 0) {
// something is serious wrong
throwRuntimeException(env, "Error calling eventfd_read(...)");
}
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollCreate(JNIEnv * env, jclass clazz) {
jint efd = epoll_create1(EPOLL_CLOEXEC);
if (efd < 0) {
int err = errno;
throwRuntimeException(env, exceptionMessage("Error during epoll_create(...): ", err));
}
return efd;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollWait(JNIEnv * env, jclass clazz, jint efd, jlongArray events, jint timeout) {
int len = (*env)->GetArrayLength(env, events);
struct epoll_event ev[len];
int ready;
int err;
do {
ready = epoll_wait(efd, ev, len, timeout);
// was interrupted try again.
} while (ready == -1 && (( err = errno) == EINTR));
if (ready < 0) {
throwIOException(env, exceptionMessage("Error during epoll_wait(...): ", err));
return -1;
}
if (ready == 0) {
// nothing ready for process
return 0;
}
jboolean isCopy;
jlong *elements = (*env)->GetLongArrayElements(env, events, &isCopy);
if (elements == NULL) {
// No memory left ?!?!?
throwOutOfMemoryError(env, "Can't allocate memory");
return -1;
}
int i;
for (i = 0; i < ready; i++) {
// store the ready ops and id
elements[i] = (jlong) ev[i].data.u64;
if (ev[i].events & EPOLLIN) {
elements[i] |= EPOLL_READ;
}
if (ev[i].events & EPOLLRDHUP) {
elements[i] |= EPOLL_RDHUP;
}
if (ev[i].events & EPOLLOUT) {
elements[i] |= EPOLL_WRITE;
}
}
jint mode;
// release again to prevent memory leak
if (isCopy) {
mode = 0;
} else {
// was just pinned so use JNI_ABORT to eliminate not needed copy.
mode = JNI_ABORT;
}
(*env)->ReleaseLongArrayElements(env, events, elements, mode);
return ready;
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id) {
if (epollCtl(env, efd, EPOLL_CTL_ADD, fd, flags, id) < 0) {
int err = errno;
throwRuntimeException(env, exceptionMessage("Error during calling epoll_ctl(...): ", err));
}
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id) {
if (epollCtl(env, efd, EPOLL_CTL_MOD, fd, flags, id) < 0) {
int err = errno;
throwRuntimeException(env, exceptionMessage("Error during calling epoll_ctl(...): ", err));
}
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz, jint efd, jint fd) {
// Create an empty event to workaround a bug in older kernels which can not handle NULL.
struct epoll_event event = { 0 };
if (epoll_ctl(efd, EPOLL_CTL_DEL, fd, &event) < 0) {
int err = errno;
throwRuntimeException(env, exceptionMessage("Error during calling epoll_ctl(...): ", err));
}
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) {
// TODO: We could also maybe pass the address in directly and so eliminate this call
// not sure if this would buy us much. So some testing is needed.
void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
if (buffer == NULL) {
throwRuntimeException(env, "Unable to access address of buffer");
return -1;
}
ssize_t res;
int err;
do {
res = write(fd, buffer + pos, (size_t) (limit - pos));
// keep on writing if it was interrupted
} while(res == -1 && ((err = errno) == EINTR));
if (res < 0) {
// network stack saturated... try again later
if (err == EAGAIN || err == EWOULDBLOCK) {
return 0;
}
if (err == EBADF) {
throwClosedChannelException(env);
return -1;
}
throwIOException(env, exceptionMessage("Error while write(...): ", err));
return -1;
}
return (jint) res;
}
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) {
struct iovec iov[length];
int i;
int iovidx = 0;
for (i = offset; i < length; i++) {
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i);
jint pos;
// Get the current position using the (*env)->GetIntField if possible and fallback
// to slower (*env)->CallIntMethod(...) if needed
if (posFieldId == NULL) {
pos = (*env)->CallIntMethod(env, bufObj, posId, NULL);
} else {
pos = (*env)->GetIntField(env, bufObj, posFieldId);
}
jint limit;
// Get the current limit using the (*env)->GetIntField if possible and fallback
// to slower (*env)->CallIntMethod(...) if needed
if (limitFieldId == NULL) {
limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL);
} else {
limit = (*env)->GetIntField(env, bufObj, limitFieldId);
}
void *buffer = (*env)->GetDirectBufferAddress(env, bufObj);
if (buffer == NULL) {
throwRuntimeException(env, "Unable to access address of buffer");
return -1;
}
iov[iovidx].iov_base = buffer + pos;
iov[iovidx].iov_len = (size_t) (limit - pos);
iovidx++;
}
ssize_t res;
int err;
do {
res = writev(fd, iov, length);
// keep on writing if it was interrupted
} while(res == -1 && ((err = errno) == EINTR));
if (res < 0) {
if (err == EAGAIN || err == EWOULDBLOCK) {
// network stack is saturated we will try again later
return 0;
}
if (err == EBADF) {
throwClosedChannelException(env);
return -1;
}
throwIOException(env, exceptionMessage("Error while write(...): ", err));
return -1;
}
return res;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) {
// TODO: We could also maybe pass the address in directly and so eliminate this call
// not sure if this would buy us much. So some testing is needed.
void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
if (buffer == NULL) {
throwRuntimeException(env, "Unable to access address of buffer");
return -1;
}
ssize_t res;
int err;
do {
res = read(fd, buffer + pos, (size_t) (limit - pos));
// Keep on reading if we was interrupted
} while (res == -1 && ((err = errno) == EINTR));
if (res < 0) {
if (err == EAGAIN || err == EWOULDBLOCK) {
// Nothing left to read
return 0;
}
if (err == EBADF) {
throwClosedChannelException(env);
return -1;
}
throwIOException(env, exceptionMessage("Error while read(...): ", err));
return -1;
}
if (res == 0) {
// end-of-stream
return -1;
}
return (jint) res;
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd) {
if (close(fd) < 0) {
throwIOException(env, "Error closing file descriptor");
}
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_shutdown(JNIEnv * env, jclass clazz, jint fd, jboolean read, jboolean write) {
int mode;
if (read && write) {
mode = SHUT_RDWR;
} else if (read) {
mode = SHUT_RD;
} else if (write) {
mode = SHUT_WR;
}
if (shutdown(fd, mode) < 0) {
throwIOException(env, "Error shutdown socket file descriptor");
}
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, jclass clazz) {
// TODO: Maybe also respect -Djava.net.preferIPv4Stack=true
int fd = socket(socketType, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (fd == -1) {
int err = errno;
throwIOException(env, exceptionMessage("Error creating socket: ", err));
return -1;
} else if (socketType == AF_INET6){
// Allow to listen /connect ipv4 and ipv6
int optval = 0;
if (setOption(env, fd, IPPROTO_IPV6, IPV6_V6ONLY, &optval, sizeof(optval)) < 0) {
// Something went wrong so close the fd and return here. setOption(...) itself throws the exception already.
close(fd);
return -1;
}
}
return fd;
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_bind(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) {
struct sockaddr_storage addr;
init_sockaddr(env, address, scopeId, port, &addr);
if(bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == -1){
int err = errno;
throwIOException(env, exceptionMessage("Error during bind(...): ", err));
}
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_listen(JNIEnv * env, jclass clazz, jint fd, jint backlog) {
if(listen(fd, backlog) == -1) {
int err = errno;
throwIOException(env, exceptionMessage("Error during listen(...): ", err));
}
}
JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_connect(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) {
struct sockaddr_storage addr;
init_sockaddr(env, address, scopeId, port, &addr);
int res;
int err;
do {
res = connect(fd, (struct sockaddr *) &addr, sizeof(addr));
} while (res == -1 && ((err = errno) == EINTR));
if (res < 0) {
if (err == EINPROGRESS) {
// connect not complete yet need to wait for EPOLLOUT event
return JNI_FALSE;
}
throwIOException(env, exceptionMessage("Unable to connect to remote host: ", err));
return JNI_FALSE;
}
return JNI_TRUE;
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_finishConnect(JNIEnv * env, jclass clazz, jint fd) {
// connect done, check for error
int optval;
int res = getOption(env, fd, SOL_SOCKET, SO_ERROR, &optval, sizeof(optval));
if (res == 0) {
return;
}
throwIOException(env, exceptionMessage("Unable to connect to remote host: ", optval));
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_accept(JNIEnv * env, jclass clazz, jint fd) {
jint socketFd;
int err;
do {
if (accept4) {
socketFd = accept4(fd, NULL, 0, SOCK_NONBLOCK | SOCK_CLOEXEC);
} else {
socketFd = accept(fd, NULL, 0);
}
} while (socketFd == -1 && ((err = errno) == EINTR));
if (socketFd == -1) {
if (err == EAGAIN || err == EWOULDBLOCK) {
// Everything consumed so just return -1 here.
return -1;
} else {
throwIOException(env, exceptionMessage("Error during accept(...): ", err));
return -1;
}
}
if (accept4) {
return socketFd;
} else {
// accept4 was not present so need two more sys-calls ...
if (fcntl(socketFd, F_SETFD, FD_CLOEXEC) == -1) {
throwIOException(env, exceptionMessage("Error during accept(...): ", err));
return -1;
}
if (fcntl(socketFd, F_SETFL, O_NONBLOCK) == -1) {
throwIOException(env, exceptionMessage("Error during accept(...): ", err));
return -1;
}
}
return socketFd;
}
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_sendfile(JNIEnv *env, jclass clazz, jint fd, jobject fileRegion, jlong off, jlong len) {
jobject fileChannel = (*env)->GetObjectField(env, fileRegion, fileChannelFieldId);
if (fileChannel == NULL) {
throwRuntimeException(env, "Unable to obtain FileChannel from FileRegion");
return -1;
}
jobject fileDescriptor = (*env)->GetObjectField(env, fileChannel, fileDescriptorFieldId);
if (fileDescriptor == NULL) {
throwRuntimeException(env, "Unable to obtain FileDescriptor from FileChannel");
return -1;
}
jint srcFd = (*env)->GetIntField(env, fileDescriptor, fdFieldId);
if (srcFd == -1) {
throwRuntimeException(env, "Unable to obtain the fd from the FileDescriptor");
return -1;
}
ssize_t res;
off_t offset = off;
int err;
do {
res = sendfile(fd, srcFd, &offset, (size_t) len);
} while (res == -1 && ((err = errno) == EINTR));
if (res < 0) {
if (err == EAGAIN) {
return 0;
}
throwIOException(env, exceptionMessage("Error during accept(...): ", err));
return -1;
}
if (res > 0) {
// update the transfered field in DefaultFileRegion
(*env)->SetLongField(env, fileRegion, transferedFieldId, off + res);
}
return res;
}
JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_remoteAddress(JNIEnv * env, jclass clazz, jint fd) {
socklen_t len;
struct sockaddr_storage addr;
len = sizeof addr;
if (getpeername(fd, (struct sockaddr*)&addr, &len) == -1) {
return NULL;
}
return createInetSocketAddress(env, addr);
}
JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_localAddress(JNIEnv * env, jclass clazz, jint fd) {
socklen_t len;
struct sockaddr_storage addr;
len = sizeof addr;
if (getsockname(fd, (struct sockaddr*)&addr, &len) == -1) {
return NULL;
}
return createInetSocketAddress(env, addr);
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv * env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv *env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv *env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, SOL_TCP, TCP_CORK, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv *env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv *env, jclass clazz, jint fd, jint optval) {
struct linger solinger;
if (optval < 0) {
solinger.l_onoff = 0;
solinger.l_linger = 0;
} else {
solinger.l_onoff = 1;
solinger.l_linger = optval;
}
setOption(env, fd, SOL_SOCKET, SO_LINGER, &solinger, sizeof(solinger));
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getReceiveBufferSize(JNIEnv * env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv *env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv *env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, SOL_TCP, TCP_CORK, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv *env, jclass clazz, jint fd) {
struct linger optval;
if (getOption(env, fd, SOL_SOCKET, SO_LINGER, &optval, sizeof(optval)) == -1) {
return -1;
}
if (optval.l_onoff == 0) {
return -1;
} else {
return optval.l_linger;
}
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv *env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
#include <jni.h>
#define EPOLL_READ 0x01
#define EPOLL_WRITE 0x02
#define EPOLL_ACCEPT 0x04
#define EPOLL_RDHUP 0x08
jint Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz);
void Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value);
void Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_epollCreate(JNIEnv * env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_epollWait(JNIEnv * env, jclass clazz, jint efd, jlongArray events, jint timeout);
void Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id);
void Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id);
void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz, jint efd, jint fd);
jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length);
jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd);
void Java_io_netty_channel_epoll_Native_shutdown(JNIEnv * env, jclass clazz, jint fd, jboolean read, jboolean write);
jint Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, jclass clazz);
void Java_io_netty_channel_epoll_Native_bind(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port);
void Java_io_netty_channel_epoll_Native_listen(JNIEnv * env, jclass clazz, jint fd, jint backlog);
jboolean Java_io_netty_channel_epoll_Native_connect(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port);
void Java_io_netty_channel_epoll_Native_finishConnect(JNIEnv * env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_accept(JNIEnv * env, jclass clazz, jint fd);
jlong Java_io_netty_channel_epoll_Native_sendfile(JNIEnv *env, jclass clazz, jint fd, jobject fileRegion, jlong off, jlong len);
jobject Java_io_netty_channel_epoll_Native_remoteAddress(JNIEnv * env, jclass clazz, jint fd);
jobject Java_io_netty_channel_epoll_Native_localAddress(JNIEnv * env, jclass clazz, jint fd);
void Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv * env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv *env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv *env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv *env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv *env, jclass clazz, jint fd, jint optval);
jint Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getReceiveBufferSize(JNIEnv * env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv *env, jclass clazz, jint fd);

View File

@ -0,0 +1,163 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.net.InetSocketAddress;
abstract class AbstractEpollChannel extends AbstractChannel {
private static final ChannelMetadata DATA = new ChannelMetadata(false);
private final int readFlag;
protected int flags;
protected volatile boolean active;
volatile int fd;
int id;
AbstractEpollChannel(Channel parent, int fd, int flag) {
super(parent);
this.fd = fd;
readFlag = flag;
flags |= flag;
}
AbstractEpollChannel(int flag) {
this(null, socketFd(), flag);
}
private static int socketFd() {
try {
return Native.socket();
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public boolean isActive() {
return active;
}
@Override
public ChannelMetadata metadata() {
return DATA;
}
@Override
protected void doClose() throws Exception {
active = false;
int fd = this.fd;
this.fd = -1;
Native.close(fd);
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EpollEventLoop;
}
@Override
public boolean isOpen() {
return fd != -1;
}
@Override
protected void doDeregister() throws Exception {
((EpollEventLoop) eventLoop()).remove(this);
}
@Override
protected void doBeginRead() throws Exception {
if ((flags & readFlag) == 0) {
flags |= readFlag;
((EpollEventLoop) eventLoop()).modify(this);
}
}
protected final void clearEpollIn() {
if ((flags & readFlag) != 0) {
flags = ~readFlag;
((EpollEventLoop) eventLoop()).modify(this);
}
}
@Override
protected void doRegister() throws Exception {
EpollEventLoop loop = (EpollEventLoop) eventLoop();
loop.add(this);
}
@Override
protected abstract AbstractEpollUnsafe newUnsafe();
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
/**
* Called once EPOLLIN event is ready to be processed
*/
abstract void epollInReady();
/**
* Called once EPOLLRDHUP event is ready to be processed
*/
void epollRdHupReady() {
// NOOP
}
@Override
protected void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (isFlushPending()) {
return;
}
super.flush0();
}
/**
* Called once a EPOLLOUT event is ready to be processed
*/
void epollOutReady() {
// directly call super.flush0() to force a flush now
super.flush0();
}
private boolean isFlushPending() {
return (flags & Native.EPOLLOUT) != 0;
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.ChannelOption;
public final class EpollChannelOption {
private static final Class<EpollChannelOption> T = EpollChannelOption.class;
public static final ChannelOption<Boolean> TCP_CORK = ChannelOption.valueOf(T, "TCP_CORK");
private EpollChannelOption() { }
}

View File

@ -0,0 +1,356 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* {@link EventLoop} which uses epoll under the covers. Only works on Linux!
*/
final class EpollEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
private static final AtomicIntegerFieldUpdater<EpollEventLoop> WAKEN_UP_UPDATER;
static {
AtomicIntegerFieldUpdater<EpollEventLoop> updater =
PlatformDependent.newAtomicIntegerFieldUpdater(EpollEventLoop.class, "wakenUp");
if (updater == null) {
updater = AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp");
}
WAKEN_UP_UPDATER = updater;
}
private final int epollFd;
private final int eventFd;
private final Map<Integer, AbstractEpollChannel> ids = new HashMap<Integer, AbstractEpollChannel>();
private final long[] events;
private int id;
private int oldWakenUp;
private boolean overflown;
@SuppressWarnings("unused")
private volatile int wakenUp;
private volatile int ioRatio = 50;
EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents) {
super(parent, executor, false);
events = new long[maxEvents];
boolean success = false;
int epollFd = -1;
int eventFd = -1;
try {
this.epollFd = epollFd = Native.epollCreate();
this.eventFd = eventFd = Native.eventFd();
Native.epollCtlAdd(epollFd, eventFd, Native.EPOLLIN, 0);
success = true;
} finally {
if (!success) {
if (epollFd != -1) {
try {
Native.close(epollFd);
} catch (Exception e) {
// ignore
}
}
if (eventFd != -1) {
try {
Native.close(eventFd);
} catch (Exception e) {
// ignore
}
}
}
}
}
private int nextId() {
int id = this.id;
if (id == Integer.MAX_VALUE) {
overflown = true;
id = 0;
}
if (overflown) {
// the ids had an overflow before so we need to make sure the id is not in use atm before assign
// it.
for (;;) {
if (!ids.containsKey(++id)) {
this.id = id;
break;
}
}
} else {
this.id = ++id;
}
return id;
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
// write to the evfd which will then wake-up epoll_wait(...)
Native.eventFdWrite(eventFd, 1L);
}
}
/**
* Register the given epoll with this {@link io.netty.channel.EventLoop}.
*/
void add(AbstractEpollChannel ch) {
assert inEventLoop();
int id = nextId();
Native.epollCtlAdd(epollFd, ch.fd, ch.flags, id);
ch.id = id;
ids.put(id, ch);
}
/**
* The flags of the given epoll was modified so update the registration
*/
void modify(AbstractEpollChannel ch) {
assert inEventLoop();
Native.epollCtlMod(epollFd, ch.fd, ch.flags, ch.id);
}
/**
* Deregister the given epoll from this {@link io.netty.channel.EventLoop}.
*/
void remove(AbstractEpollChannel ch) {
assert inEventLoop();
if (ids.remove(ch.id) != null && ch.isOpen()) {
// Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
// removed once the file-descriptor is closed.
Native.epollCtlDel(epollFd, ch.fd);
}
}
@Override
protected Queue<Runnable> newTaskQueue() {
// This event loop never calls takeTask()
return new ConcurrentLinkedQueue<Runnable>();
}
/**
* Returns the percentage of the desired amount of time spent for I/O in the event loop.
*/
public int getIoRatio() {
return ioRatio;
}
/**
* Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
*/
public void setIoRatio(int ioRatio) {
if (ioRatio <= 0 || ioRatio > 100) {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
}
this.ioRatio = ioRatio;
}
private int epollWait() {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
int ready = Native.epollWait(epollFd, events, 0);
if (ready > 0) {
return ready;
}
}
break;
}
int selectedKeys = Native.epollWait(epollFd, events, (int) timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp == 1 || wakenUp == 1 || hasTasks()) {
// Selected something,
// waken up by user, or
// the task queue has a pending task.
return selectedKeys;
}
currentTimeNanos = System.nanoTime();
}
return 0;
}
@Override
protected void run() {
for (;;) {
oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0);
try {
int ready;
if (hasTasks()) {
// Non blocking just return what is ready directly without block
ready = Native.epollWait(epollFd, events, 0);
} else {
ready = epollWait();
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp == 1) {
Native.eventFdWrite(eventFd, 1L);
}
}
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
if (ready > 0) {
processReady(events, ready);
}
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
if (ready > 0) {
processReady(events, ready);
}
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
private void closeAll() {
int ready = Native.epollWait(epollFd, events, 0);
Collection<AbstractEpollChannel> channels = new ArrayList<AbstractEpollChannel>(ready);
for (int i = 0; i < ready; i++) {
final long ev = events[i];
int id = (int) (ev >> 32L);
AbstractEpollChannel ch = ids.get(id);
if (ch != null) {
channels.add(ids.get(id));
}
}
for (AbstractEpollChannel ch: channels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
private void processReady(long[] events, int ready) {
for (int i = 0; i < ready; i ++) {
final long ev = events[i];
int id = (int) (ev >> 32L);
if (id == 0) {
// consume wakeup event
Native.eventFdRead(eventFd);
} else {
boolean read = (ev & Native.EPOLLIN) != 0;
boolean write = (ev & Native.EPOLLOUT) != 0;
boolean close = (ev & Native.EPOLLRDHUP) != 0;
AbstractEpollChannel ch = ids.get(id);
if (ch != null) {
AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
if (write) {
// force flush of data as the epoll is writable again
unsafe.epollOutReady();
}
if (read) {
// Something is ready to read, so consume it now
unsafe.epollInReady();
}
if (close) {
unsafe.epollRdHupReady();
}
}
}
}
}
@Override
protected void cleanup() {
try {
Native.close(epollFd);
} catch (IOException e) {
logger.warn("Failed to close the epoll fd.", e);
}
try {
Native.close(eventFd);
} catch (IOException e) {
logger.warn("Failed to close the event fd.", e);
}
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* {@link EventLoopGroup} which uses epoll under the covers. Because of this
* it only works on linux.
*/
public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance using the default number of threads and the default {@link ThreadFactory}.
*/
public EpollEventLoopGroup() {
this(0);
}
/**
* Create a new instance using the specified number of threads and the default {@link ThreadFactory}.
*/
public EpollEventLoopGroup(int nThreads) {
this(nThreads, null);
}
/**
* Create a new instance using the specified number of threads and the given {@link ThreadFactory}.
*/
public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, 128);
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
* maximal amount of epoll events to handle per epollWait(...).
*/
public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) {
super(nThreads, threadFactory, maxEventsAtOnce);
}
/**
* Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
*/
public void setIoRatio(int ioRatio) {
for (EventExecutor e: children()) {
((EpollEventLoop) e).setIoRatio(ioRatio);
}
}
@Override
protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
return new EpollEventLoop(this, executor, (Integer) args[0]);
}
}

View File

@ -0,0 +1,124 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannelConfig;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
/**
* {@link ServerSocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
* maximal performance.
*/
public final class EpollServerSocketChannel extends AbstractEpollChannel implements ServerSocketChannel {
private final EpollServerSocketChannelConfig config;
private volatile InetSocketAddress local;
public EpollServerSocketChannel() {
super(Native.EPOLLACCEPT);
config = new EpollServerSocketChannelConfig(this);
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EpollEventLoop;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
InetSocketAddress addr = (InetSocketAddress) localAddress;
Native.bind(fd, addr.getAddress(), addr.getPort());
local = addr;
Native.listen(fd, config.getBacklog());
active = true;
}
@Override
public ServerSocketChannelConfig config() {
return config;
}
@Override
protected InetSocketAddress localAddress0() {
return local;
}
@Override
protected InetSocketAddress remoteAddress0() {
return null;
}
@Override
protected AbstractEpollUnsafe newUnsafe() {
return new EpollServerSocketUnsafe();
}
@Override
protected void doWrite(ChannelOutboundBuffer in) {
throw new UnsupportedOperationException();
}
final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {
@Override
public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
// Connect not supported by ServerChannel implementations
channelPromise.setFailure(new UnsupportedOperationException());
}
@Override
void epollInReady() {
assert eventLoop().inEventLoop();
try {
final ChannelPipeline pipeline = pipeline();
Throwable exception = null;
try {
for (;;) {
int socketFd = Native.accept(fd);
if (socketFd == -1) {
// this means everything was handled for now
break;
}
try {
pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
}
}
} catch (Throwable t) {
exception = t;
}
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
}
} finally {
if (!config().isAutoRead()) {
clearEpollIn();
}
}
}
}
}

View File

@ -0,0 +1,176 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.util.NetUtil;
import java.util.Map;
import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_REUSEADDR;
final class EpollServerSocketChannelConfig extends DefaultChannelConfig
implements ServerSocketChannelConfig {
private final EpollServerSocketChannel channel;
private volatile int backlog = NetUtil.SOMAXCONN;
EpollServerSocketChannelConfig(EpollServerSocketChannel channel) {
super(channel);
this.channel = channel;
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == SO_REUSEADDR) {
return (T) Boolean.valueOf(isReuseAddress());
}
if (option == SO_BACKLOG) {
return (T) Integer.valueOf(getBacklog());
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else if (option == SO_BACKLOG) {
setBacklog((Integer) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public boolean isReuseAddress() {
return Native.isReuseAddress(channel.fd) == 1;
}
@Override
public ServerSocketChannelConfig setReuseAddress(boolean reuseAddress) {
Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0);
return this;
}
@Override
public int getReceiveBufferSize() {
return Native.getReceiveBufferSize(channel.fd);
}
@Override
public ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
Native.setReceiveBufferSize(channel.fd, receiveBufferSize);
return this;
}
@Override
public ServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
return this;
}
@Override
public int getBacklog() {
return backlog;
}
@Override
public ServerSocketChannelConfig setBacklog(int backlog) {
if (backlog < 0) {
throw new IllegalArgumentException("backlog: " + backlog);
}
this.backlog = backlog;
return this;
}
@Override
public ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public ServerSocketChannelConfig setAutoRead(boolean autoRead) {
super.setAutoRead(autoRead);
return this;
}
@Override
public ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
}

View File

@ -0,0 +1,590 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
* maximal performance.
*/
public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel {
private final EpollSocketChannelConfig config;
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
private volatile boolean inputShutdown;
private volatile boolean outputShutdown;
EpollSocketChannel(Channel parent, int fd) {
super(parent, fd, Native.EPOLLIN);
active = true;
config = new EpollSocketChannelConfig(this);
}
public EpollSocketChannel() {
super(Native.EPOLLIN);
config = new EpollSocketChannelConfig(this);
}
@Override
protected AbstractEpollUnsafe newUnsafe() {
return new EpollSocketUnsafe();
}
@Override
protected SocketAddress localAddress0() {
return Native.localAddress(fd);
}
@Override
protected SocketAddress remoteAddress0() {
return Native.remoteAddress(fd);
}
@Override
protected void doBind(SocketAddress local) throws Exception {
InetSocketAddress localAddress = (InetSocketAddress) local;
Native.bind(fd, localAddress.getAddress(), localAddress.getPort());
}
private void setEpollOut() {
if ((flags & Native.EPOLLOUT) == 0) {
flags |= Native.EPOLLOUT;
((EpollEventLoop) eventLoop()).modify(this);
}
}
private void clearEpollOut() {
if ((flags & Native.EPOLLOUT) != 0) {
flags = ~Native.EPOLLOUT;
((EpollEventLoop) eventLoop()).modify(this);
}
}
/**
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param buf the {@link ByteBuf} from which the bytes should be written
* @return amount the amount of written bytes
*/
private int doWriteBytes(ByteBuf buf, int readable) throws Exception {
int readerIndex = buf.readerIndex();
int localFlushedAmount;
if (buf.nioBufferCount() == 1) {
ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, readable);
localFlushedAmount = Native.write(fd, nioBuf, nioBuf.position(), nioBuf.limit());
} else {
// backed by more then one buffer, do a gathering write...
ByteBuffer[] nioBufs = buf.nioBuffers();
localFlushedAmount = (int) Native.writev(fd, nioBufs, 0, nioBufs.length);
}
if (localFlushedAmount > 0) {
buf.readerIndex(readerIndex + localFlushedAmount);
}
return localFlushedAmount;
}
/**
* Write a {@link DefaultFileRegion}
*
* @param region the {@link DefaultFileRegion} from which the bytes should be written
* @return amount the amount of written bytes
*/
private long doWriteFileRegion(DefaultFileRegion region, long count) throws Exception {
return Native.sendfile(fd, region, region.transfered(), count);
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
final int msgCount = in.size();
if (msgCount == 0) {
// Wrote all messages.
clearEpollOut();
break;
}
// Do non-gathering write for a single buffer case.
if (msgCount > 1) {
// Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
if (nioBuffers != null) {
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes < expectedWrittenBytes) {
int nioBufIndex = 0;
setEpollOut();
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < localWrittenBytes) {
nioBufIndex += buf.nioBufferCount();
in.remove();
localWrittenBytes -= readableBytes;
} else if (readableBytes > localWrittenBytes) {
buf.readerIndex(readerIndex + (int) localWrittenBytes);
in.progress(localWrittenBytes);
// update position in ByteBuffer as we not do this in the native methods
ByteBuffer bb = nioBuffers[nioBufIndex];
bb.position(bb.position() + (int) localWrittenBytes);
break;
} else { // readable == writtenBytes
in.remove();
break;
}
}
} else {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
}
}
// try again as a ChannelFuture may be notified in the meantime and triggered another flush
continue;
}
}
Object msg = in.current();
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
int expected = buf.readableBytes();
int localFlushedAmount = doWriteBytes(buf, expected);
in.progress(localFlushedAmount);
if (localFlushedAmount < expected) {
setEpollOut();
break;
}
if (!buf.isReadable()) {
in.remove();
}
} else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg;
long expected = region.count() - region.position();
long localFlushedAmount = doWriteFileRegion(region, expected);
in.progress(localFlushedAmount);
if (localFlushedAmount < expected) {
setEpollOut();
break;
}
if (region.transfered() >= region.count()) {
in.remove();
}
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
}
}
}
@Override
public EpollSocketChannelConfig config() {
return config;
}
@Override
public boolean isInputShutdown() {
return inputShutdown;
}
@Override
public boolean isOutputShutdown() {
return outputShutdown || !isActive();
}
@Override
public ChannelFuture shutdownOutput() {
return shutdownOutput(newPromise());
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
try {
Native.shutdown(fd, false, true);
outputShutdown = true;
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownOutput(promise);
}
});
}
return promise;
}
@Override
public ServerSocketChannel parent() {
return (ServerSocketChannel) super.parent();
}
final class EpollSocketUnsafe extends AbstractEpollUnsafe {
private RecvByteBufAllocator.Handle allocHandle;
@Override
public void write(Object msg, ChannelPromise promise) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isDirect()) {
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
int readable = buf.readableBytes();
ByteBuf dst = alloc().directBuffer(readable);
dst.writeBytes(buf, buf.readerIndex(), readable);
buf.release();
msg = dst;
}
}
super.write(msg, promise);
}
private void closeOnRead(ChannelPipeline pipeline) {
inputShutdown = true;
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
clearEpollIn();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
}
}
private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
return true;
}
return false;
}
@Override
public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}
boolean wasActive = isActive();
if (doConnect((InetSocketAddress) remoteAddress, (InetSocketAddress) localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = EpollSocketChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
closeIfClosed();
promise.tryFailure(t);
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
active = true;
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
}
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(cause);
closeIfClosed();
}
private void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + requestedRemoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
fulfillConnectPromise(connectPromise, t);
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
@Override
void epollOutReady() {
if (connectPromise != null) {
// pending connect which is now complete so handle it.
finishConnect();
} else {
super.epollOutReady();
}
}
/**
* Connect to the remote peer
*/
private boolean doConnect(InetSocketAddress remoteAddress, InetSocketAddress localAddress) throws Exception {
if (localAddress != null) {
Native.bind(fd, localAddress.getAddress(), localAddress.getPort());
}
boolean success = false;
try {
boolean connected = Native.connect(fd, remoteAddress.getAddress(),
remoteAddress.getPort());
if (!connected) {
setEpollOut();
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
/**
* Finish the connect
*/
private void doFinishConnect() throws Exception {
Native.finishConnect(fd);
clearEpollOut();
}
/**
* Read bytes into the given {@link ByteBuf} and return the amount.
*/
private int doReadBytes(ByteBuf byteBuf) throws Exception {
ByteBuffer buf = byteBuf.internalNioBuffer(0, byteBuf.writableBytes());
int localReadAmount = Native.read(fd, buf, buf.position(), buf.limit());
if (localReadAmount > 0) {
byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount);
}
return localReadAmount;
}
@Override
void epollRdHupReady() {
closeOnRead(pipeline());
}
@Override
void epollInReady() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = null;
boolean close = false;
try {
int byteBufCapacity = allocHandle.guess();
int totalReadAmount = 0;
for (;;) {
// we use a direct buffer here as the native implementations only be able
// to handle direct buffers.
byteBuf = allocator.directBuffer(byteBufCapacity);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
}
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
allocHandle.record(totalReadAmount);
// Avoid overflow.
totalReadAmount = localReadAmount;
} else {
totalReadAmount += localReadAmount;
}
if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
}
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
boolean closed = handleReadException(pipeline, byteBuf, t, close);
if (!closed) {
// trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket
eventLoop().execute(new Runnable() {
@Override
public void run() {
epollInReady();
}
});
}
} finally {
if (!config.isAutoRead()) {
clearEpollIn();
}
}
}
}
}

View File

@ -0,0 +1,280 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.internal.PlatformDependent;
import java.util.Map;
import static io.netty.channel.ChannelOption.*;
public final class EpollSocketChannelConfig extends DefaultChannelConfig
implements SocketChannelConfig {
protected final EpollSocketChannel channel;
private volatile boolean allowHalfClosure;
/**
* Creates a new instance.
*/
EpollSocketChannelConfig(EpollSocketChannel channel) {
super(channel);
this.channel = channel;
if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
setTcpNoDelay(true);
}
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(
super.getOptions(),
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS,
ALLOW_HALF_CLOSURE, EpollChannelOption.TCP_CORK);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == SO_SNDBUF) {
return (T) Integer.valueOf(getSendBufferSize());
}
if (option == TCP_NODELAY) {
return (T) Boolean.valueOf(isTcpNoDelay());
}
if (option == SO_KEEPALIVE) {
return (T) Boolean.valueOf(isKeepAlive());
}
if (option == SO_REUSEADDR) {
return (T) Boolean.valueOf(isReuseAddress());
}
if (option == SO_LINGER) {
return (T) Integer.valueOf(getSoLinger());
}
if (option == IP_TOS) {
return (T) Integer.valueOf(getTrafficClass());
}
if (option == ALLOW_HALF_CLOSURE) {
return (T) Boolean.valueOf(isAllowHalfClosure());
}
if (option == EpollChannelOption.TCP_CORK) {
return (T) Boolean.valueOf(isTcpCork());
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_SNDBUF) {
setSendBufferSize((Integer) value);
} else if (option == TCP_NODELAY) {
setTcpNoDelay((Boolean) value);
} else if (option == SO_KEEPALIVE) {
setKeepAlive((Boolean) value);
} else if (option == SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else if (option == SO_LINGER) {
setSoLinger((Integer) value);
} else if (option == IP_TOS) {
setTrafficClass((Integer) value);
} else if (option == ALLOW_HALF_CLOSURE) {
setAllowHalfClosure((Boolean) value);
} else if (option == EpollChannelOption.TCP_CORK) {
setTcpCork((Boolean) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public int getReceiveBufferSize() {
return Native.getReceiveBufferSize(channel.fd);
}
@Override
public int getSendBufferSize() {
return Native.getSendBufferSize(channel.fd);
}
@Override
public int getSoLinger() {
return Native.getSoLinger(channel.fd);
}
@Override
public int getTrafficClass() {
return Native.getTrafficClass(channel.fd);
}
@Override
public boolean isKeepAlive() {
return Native.isKeepAlive(channel.fd) == 1;
}
@Override
public boolean isReuseAddress() {
return Native.isReuseAddress(channel.fd) == 1;
}
@Override
public boolean isTcpNoDelay() {
return Native.isTcpNoDelay(channel.fd) == 1;
}
public boolean isTcpCork() {
return Native.isTcpCork(channel.fd) == 1;
}
@Override
public EpollSocketChannelConfig setKeepAlive(boolean keepAlive) {
Native.setKeepAlive(channel.fd, keepAlive ? 1 : 0);
return this;
}
@Override
public EpollSocketChannelConfig setPerformancePreferences(
int connectionTime, int latency, int bandwidth) {
return this;
}
@Override
public EpollSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
Native.setReceiveBufferSize(channel.fd, receiveBufferSize);
return this;
}
@Override
public EpollSocketChannelConfig setReuseAddress(boolean reuseAddress) {
Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0);
return this;
}
@Override
public EpollSocketChannelConfig setSendBufferSize(int sendBufferSize) {
Native.setSendBufferSize(channel.fd, sendBufferSize);
return this;
}
@Override
public EpollSocketChannelConfig setSoLinger(int soLinger) {
Native.setSoLinger(channel.fd, soLinger);
return this;
}
@Override
public EpollSocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
Native.setTcpNoDelay(channel.fd, tcpNoDelay ? 1 : 0);
return this;
}
public EpollSocketChannelConfig setTcpCork(boolean tcpCork) {
Native.setTcpCork(channel.fd, tcpCork ? 1 : 0);
return this;
}
@Override
public EpollSocketChannelConfig setTrafficClass(int trafficClass) {
Native.setTrafficClass(channel.fd, trafficClass);
return this;
}
@Override
public boolean isAllowHalfClosure() {
return allowHalfClosure;
}
@Override
public EpollSocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure) {
this.allowHalfClosure = allowHalfClosure;
return this;
}
@Override
public EpollSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public EpollSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public EpollSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public EpollSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public EpollSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public EpollSocketChannelConfig setAutoRead(boolean autoRead) {
super.setAutoRead(autoRead);
return this;
}
@Override
public EpollSocketChannelConfig setAutoClose(boolean autoClose) {
super.setAutoClose(autoClose);
return this;
}
@Override
public EpollSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public EpollSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public EpollSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
}

View File

@ -0,0 +1,136 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.DefaultFileRegion;
import io.netty.util.internal.NativeLibraryLoader;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Locale;
/**
* Native helper methods
*
* <strong>Internal usage only!</strong>
*/
final class Native {
private static final byte[] IPV4_MAPPED_IPV6_PREFIX = {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff };
static {
String name = System.getProperty("os.name").toLowerCase(Locale.UK).trim();
if (!name.startsWith("linux")) {
throw new IllegalStateException("Only supported on Linux");
}
NativeLibraryLoader.load("netty-transport-native-epoll", Native.class.getClassLoader());
}
// EventLoop operations and constants
public static final int EPOLLIN = 0x01;
public static final int EPOLLOUT = 0x02;
public static final int EPOLLACCEPT = 0x04;
public static final int EPOLLRDHUP = 0x08;
public static native int eventFd();
public static native void eventFdWrite(int fd, long value);
public static native void eventFdRead(int fd);
public static native int epollCreate();
public static native int epollWait(int efd, long[] events, int timeout);
public static native void epollCtlAdd(int efd, final int fd, final int flags, final int id);
public static native void epollCtlMod(int efd, final int fd, final int flags, final int id);
public static native void epollCtlDel(int efd, final int fd);
// File-descriptor operations
public static native void close(int fd) throws IOException;
public static native int write(int fd, ByteBuffer buf, int pos, int limit) throws IOException;
public static native long writev(int fd, ByteBuffer[] buffers, int offset, int length) throws IOException;
public static native int read(int fd, ByteBuffer buf, int pos, int limit) throws IOException;
public static native long sendfile(int dest, DefaultFileRegion src, long offset, long length) throws IOException;
// socket operations
public static native int socket() throws IOException;
public static void bind(int fd, InetAddress addr, int port) throws IOException {
byte[] address;
int scopeId;
if (addr instanceof Inet6Address) {
address = addr.getAddress();
scopeId = ((Inet6Address) addr).getScopeId();
} else {
// convert to ipv4 mapped ipv6 address;
scopeId = 0;
address = ipv4MappedIpv6Address(addr.getAddress());
}
bind(fd, address, scopeId, port);
}
private static byte[] ipv4MappedIpv6Address(byte[] ipv4) {
byte[] address = new byte[16];
System.arraycopy(IPV4_MAPPED_IPV6_PREFIX, 0, address, 0, IPV4_MAPPED_IPV6_PREFIX.length);
System.arraycopy(ipv4, 0, address, 12, ipv4.length);
return address;
}
public static native void bind(int fd, byte[] address, int scopeId, int port) throws IOException;
public static native void listen(int fd, int backlog) throws IOException;
public static boolean connect(int fd, InetAddress addr, int port) throws IOException {
byte[] address;
int scopeId;
if (addr instanceof Inet6Address) {
address = addr.getAddress();
scopeId = ((Inet6Address) addr).getScopeId();
} else {
// convert to ipv4 mapped ipv6 address;
scopeId = 0;
address = ipv4MappedIpv6Address(addr.getAddress());
}
return connect(fd, address, scopeId, port);
}
public static native boolean connect(int fd, byte[] address, int scopeId, int port) throws IOException;
public static native void finishConnect(int fd) throws IOException;
public static native InetSocketAddress remoteAddress(int fd);
public static native InetSocketAddress localAddress(int fd);
public static native int accept(int fd) throws IOException;
public static native void shutdown(int fd, boolean read, boolean write) throws IOException;
// Socket option operations
public static native int getReceiveBufferSize(int fd);
public static native int getSendBufferSize(int fd);
public static native int isKeepAlive(int fd);
public static native int isReuseAddress(int fd);
public static native int isTcpNoDelay(int fd);
public static native int isTcpCork(int fd);
public static native int getSoLinger(int fd);
public static native int getTrafficClass(int fd);
public static native void setKeepAlive(int fd, int keepAlive);
public static native void setReceiveBufferSize(int fd, int receiveBufferSize);
public static native void setReuseAddress(int fd, int reuseAddress);
public static native void setSendBufferSize(int fd, int sendBufferSize);
public static native void setTcpNoDelay(int fd, int tcpNoDelay);
public static native void setTcpCork(int fd, int tcpCork);
public static native void setSoLinger(int fd, int soLinger);
public static native void setTrafficClass(int fd, int tcpNoDelay);
private Native() {
// utility
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Optimized transport for linux which uses <a href="http://en.wikipedia.org/wiki/Epoll">EPOLL Edge-Triggered Mode</a>
* for maximal performance.
*/
package io.netty.channel.epoll;

View File

@ -0,0 +1,47 @@
dnl ---------------------------------------------------------------------------
dnl Copyright 2014 The Netty Project
dnl
dnl Licensed under the Apache License, Version 2.0 (the "License");
dnl you may not use this file except in compliance with the License.
dnl You may obtain a copy of the License at
dnl
dnl http://www.apache.org/licenses/LICENSE-2.0
dnl
dnl Unless required by applicable law or agreed to in writing, software
dnl distributed under the License is distributed on an "AS IS" BASIS,
dnl WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
dnl See the License for the specific language governing permissions and
dnl limitations under the License.
dnl ---------------------------------------------------------------------------
AC_DEFUN([CUSTOM_M4_SETUP],
[
AC_MSG_CHECKING(which arch to build for)
AC_ARG_WITH([arch],
[AS_HELP_STRING([--with-arch@<:@=ARCH@:>@],
[Build for the specified architecture. Pick from: i386, x86_64.])],
[
AS_IF(test -n "$withval", [
ARCH="$withval"
AC_MSG_RESULT([yes, archs: $ARCH])
])
],[
ARCH=""
AC_MSG_RESULT([no])
])
AS_IF(test "$ARCH" = "i386", [
FLAGS="-m32"
], test "ARCH" = "x86_64", [
FLAGS="-m64"
], [
FLAGS=""
])
AS_IF(test -n "$FLAGS", [
CFLAGS="$FLAGS $CFLAGS"
CXXFLAGS="$FLAGS $CXXFLAGS"
LDFLAGS="$FLAGS $ARCH $LDFLAGS"
AC_SUBST(CFLAGS)
AC_SUBST(CXXFLAGS)
AC_SUBST(LDFLAGS)
])
])

View File

@ -0,0 +1,31 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketEchoTest;
import java.util.List;
public class EpollSocketEchoTest extends SocketEchoTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollTestUtils.newFactories();
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketFileRegionTest;
import java.util.List;
public class EpollSocketFileRegionTest extends SocketFileRegionTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollTestUtils.newFactories();
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketFixedLengthEchoTest;
import java.util.List;
public class EpollSocketFixedLengthEchoTest extends SocketFixedLengthEchoTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollTestUtils.newFactories();
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketGatheringWriteTest;
import java.util.List;
public class EpollSocketGatheringWriteTest extends SocketGatheringWriteTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollTestUtils.newFactories();
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketObjectEchoTest;
import java.util.List;
public class EpollSocketObjectEchoTest extends SocketObjectEchoTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollTestUtils.newFactories();
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketSslEchoTest;
import java.util.List;
public class EpollSocketSslEchoTest extends SocketSslEchoTest {
public EpollSocketSslEchoTest(boolean serverUsesDelegatedTaskExecutor,
boolean clientUsesDelegatedTaskExecutor,
boolean useChunkedWriteHandler,
boolean useCompositeByteBuf) {
super(serverUsesDelegatedTaskExecutor, clientUsesDelegatedTaskExecutor,
useChunkedWriteHandler, useCompositeByteBuf);
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollTestUtils.newFactories();
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketStartTlsTest;
import java.util.List;
public class EpollSocketStartTlsTest extends SocketStartTlsTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollTestUtils.newFactories();
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketStringEchoTest;
import java.util.List;
public class EpollSocketStringEchoTest extends SocketStringEchoTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollTestUtils.newFactories();
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.testsuite.transport.TestsuitePermutation;
import java.util.Collections;
import java.util.List;
final class EpollTestUtils {
private static final EventLoopGroup GROUP = new EpollEventLoopGroup();
static List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return Collections.<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>>singletonList(
new TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>() {
@Override
public ServerBootstrap newServerInstance() {
return new ServerBootstrap().group(GROUP).channel(EpollServerSocketChannel.class);
}
@Override
public Bootstrap newClientInstance() {
return new Bootstrap().group(GROUP).channel(EpollSocketChannel.class);
}
});
}
private EpollTestUtils() {
// utility class
}
}