[#2376] Add support for SO_REUSEPORT in native transport

Motivation:
In linux kernel 3.9 a new featured named SO_REUSEPORT was introduced which allows to have multiple sockets bind to the same port and so handle the accept() of new connections with multiple threads. This can greatly improve the performance when you not to accept a lot of connections.

Modifications:
Implement SO_REUSEPORT via JNI

Result:
Be able to use the SO_REUSEPORT feature when using the EpollServerSocketChannel
This commit is contained in:
Norman Maurer 2014-04-11 15:54:31 +02:00
parent 40bcb17bf9
commit 6615d72db0
8 changed files with 211 additions and 22 deletions

View File

@ -27,6 +27,7 @@
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/utsname.h>
#include "io_netty_channel_epoll_Native.h"
@ -896,6 +897,10 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv
setOption(env, fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReusePort(JNIEnv * env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval));
}
@ -940,6 +945,14 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv
return optval;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReusePort(JNIEnv *env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)) == -1) {
@ -991,3 +1004,16 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv
}
return optval;
}
JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz) {
struct utsname name;
int res = uname(&name);
if (res == 0) {
return (*env)->NewStringUTF(env, name.release);
}
int err = errno;
throwRuntimeException(env, exceptionMessage("Error during uname(...): ", err));
return NULL;
}

View File

@ -48,6 +48,7 @@ jlong Java_io_netty_channel_epoll_Native_sendfile(JNIEnv *env, jclass clazz, jin
jobject Java_io_netty_channel_epoll_Native_remoteAddress(JNIEnv * env, jclass clazz, jint fd);
jobject Java_io_netty_channel_epoll_Native_localAddress(JNIEnv * env, jclass clazz, jint fd);
void Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv * env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setReusePort(JNIEnv * env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval);
@ -56,9 +57,11 @@ void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv *env, jclass clazz, ji
void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv *env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv *env, jclass clazz, jint fd, jint optval);
jint Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_isReusePort(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getReceiveBufferSize(JNIEnv * env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv *env, jclass clazz, jint fd);
jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz);

View File

@ -21,6 +21,8 @@ public final class EpollChannelOption {
private static final Class<EpollChannelOption> T = EpollChannelOption.class;
public static final ChannelOption<Boolean> TCP_CORK = ChannelOption.valueOf(T, "TCP_CORK");
public static final ChannelOption<Boolean> SO_REUSEPORT = ChannelOption.valueOf(T, "SO_REUSEPORT");
private EpollChannelOption() { }
}

View File

@ -20,7 +20,6 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannelConfig;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -55,7 +54,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
}
@Override
public ServerSocketChannelConfig config() {
public EpollServerSocketChannelConfig config() {
return config;
}

View File

@ -29,7 +29,7 @@ import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_REUSEADDR;
final class EpollServerSocketChannelConfig extends DefaultChannelConfig
public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
implements ServerSocketChannelConfig {
private final EpollServerSocketChannel channel;
@ -42,7 +42,7 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, EpollChannelOption.SO_REUSEPORT);
}
@SuppressWarnings("unchecked")
@ -57,7 +57,9 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig
if (option == SO_BACKLOG) {
return (T) Integer.valueOf(getBacklog());
}
if (option == EpollChannelOption.SO_REUSEPORT) {
return (T) Boolean.valueOf(isReusePort());
}
return super.getOption(option);
}
@ -71,6 +73,8 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig
setReuseAddress((Boolean) value);
} else if (option == SO_BACKLOG) {
setBacklog((Integer) value);
} else if (option == EpollChannelOption.SO_REUSEPORT) {
setReusePort((Boolean) value);
} else {
return super.setOption(option, value);
}
@ -84,7 +88,7 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig
}
@Override
public ServerSocketChannelConfig setReuseAddress(boolean reuseAddress) {
public EpollServerSocketChannelConfig setReuseAddress(boolean reuseAddress) {
Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0);
return this;
}
@ -95,14 +99,14 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig
}
@Override
public ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
public EpollServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
Native.setReceiveBufferSize(channel.fd, receiveBufferSize);
return this;
}
@Override
public ServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
public EpollServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
return this;
}
@ -112,7 +116,7 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig
}
@Override
public ServerSocketChannelConfig setBacklog(int backlog) {
public EpollServerSocketChannelConfig setBacklog(int backlog) {
if (backlog < 0) {
throw new IllegalArgumentException("backlog: " + backlog);
}
@ -121,56 +125,76 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig
}
@Override
public ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
public EpollServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
public EpollServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
public EpollServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
public EpollServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
public EpollServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public ServerSocketChannelConfig setAutoRead(boolean autoRead) {
public EpollServerSocketChannelConfig setAutoRead(boolean autoRead) {
super.setAutoRead(autoRead);
return this;
}
@Override
public ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
public EpollServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
public EpollServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
public EpollServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
/**
* Returns {@code true} if the SO_REUSEPORT option is set.
*/
public boolean isReusePort() {
return Native.isReusePort(channel.fd) == 1;
}
/**
* Set the SO_REUSEPORT option on the underlying Channel. This will allow to bind multiple
* {@link EpollSocketChannel}s to the same port and so accept connections with multiple threads.
*
* Be aware this method needs be called before {@link EpollSocketChannel#bind(java.net.SocketAddress)} to have
* any affect.
*/
public EpollServerSocketChannelConfig setReusePort(boolean reusePort) {
Native.setReusePort(channel.fd, reusePort ? 1 : 0);
return this;
}
}

View File

@ -127,6 +127,7 @@ final class Native {
public static native int getSendBufferSize(int fd);
public static native int isKeepAlive(int fd);
public static native int isReuseAddress(int fd);
public static native int isReusePort(int fd);
public static native int isTcpNoDelay(int fd);
public static native int isTcpCork(int fd);
public static native int getSoLinger(int fd);
@ -135,12 +136,14 @@ final class Native {
public static native void setKeepAlive(int fd, int keepAlive);
public static native void setReceiveBufferSize(int fd, int receiveBufferSize);
public static native void setReuseAddress(int fd, int reuseAddress);
public static native void setReusePort(int fd, int reuseAddress);
public static native void setSendBufferSize(int fd, int sendBufferSize);
public static native void setTcpNoDelay(int fd, int tcpNoDelay);
public static native void setTcpCork(int fd, int tcpCork);
public static native void setSoLinger(int fd, int soLinger);
public static native void setTrafficClass(int fd, int tcpNoDelay);
public static native String kernelVersion();
private Native() {
// utility
}

View File

@ -0,0 +1,132 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.testsuite.util.TestUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.atomic.AtomicBoolean;
public class EpollReuseAddrTest {
private static final int MAJOR;
private static final int MINOR;
private static final int BUGFIX;
static {
String kernelVersion = Native.kernelVersion();
int index = kernelVersion.indexOf("-");
if (index > -1) {
kernelVersion = kernelVersion.substring(0, index);
}
String[] versionParts = kernelVersion.split("\\.");
if (versionParts.length == 3) {
MAJOR = Integer.parseInt(versionParts[0]);
MINOR = Integer.parseInt(versionParts[1]);
BUGFIX = Integer.parseInt(versionParts[2]);
} else {
throw new IllegalStateException();
}
}
@Test
public void testMultipleBindWithoutReusePortFails() {
Assume.assumeTrue(versionEqOrGt(3, 9, 0));
ServerBootstrap bootstrap = createBootstrap();
ChannelFuture future = bootstrap.bind().syncUninterruptibly();
try {
bootstrap.bind().syncUninterruptibly();
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e instanceof IOException);
}
future.channel().close().syncUninterruptibly();
}
@Test(timeout = 10000)
public void testMultipleBind() throws Exception {
Assume.assumeTrue(versionEqOrGt(3, 9, 0));
ServerBootstrap bootstrap = createBootstrap();
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
final AtomicBoolean accepted1 = new AtomicBoolean();
bootstrap.childHandler(new TestHandler(accepted1));
ChannelFuture future = bootstrap.bind().syncUninterruptibly();
final AtomicBoolean accepted2 = new AtomicBoolean();
bootstrap.childHandler(new TestHandler(accepted2));
ChannelFuture future2 = bootstrap.bind().syncUninterruptibly();
InetSocketAddress address = (InetSocketAddress) future2.channel().localAddress();
while (!accepted1.get() || !accepted2.get()) {
Socket socket = new Socket(address.getAddress(), address.getPort());
socket.setReuseAddress(true);
socket.close();
}
future.channel().close().syncUninterruptibly();
future2.channel().close().syncUninterruptibly();
}
private ServerBootstrap createBootstrap() {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(EpollSocketTestPermutation.EPOLL_BOSS_GROUP, EpollSocketTestPermutation.EPOLL_WORKER_GROUP);
bootstrap.channel(EpollServerSocketChannel.class);
bootstrap.childHandler(new ChannelHandlerAdapter() { });
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
InetSocketAddress address = new InetSocketAddress(TestUtils.getFreePort());
bootstrap.localAddress(address);
return bootstrap;
}
private static boolean versionEqOrGt(int major, int minor, int bugfix) {
if (MAJOR > major) {
return true;
} else if (MAJOR == major) {
if (MINOR > minor) {
return true;
} else if (MINOR == minor) {
if (BUGFIX >= bugfix) {
return true;
}
}
}
return false;
}
@ChannelHandler.Sharable
private static class TestHandler extends ChannelInboundHandlerAdapter {
private final AtomicBoolean accepted;
TestHandler(AtomicBoolean accepted) {
this.accepted = accepted;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
accepted.set(true);
ctx.close();
}
}
}

View File

@ -32,9 +32,9 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
static final SocketTestPermutation INSTANCE = new EpollSocketTestPermutation();
private final EventLoopGroup epollBossGroup =
static final EventLoopGroup EPOLL_BOSS_GROUP =
new EpollEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-epoll-boss", true));
private final EventLoopGroup epollWorkerGroup =
static final EventLoopGroup EPOLL_WORKER_GROUP =
new EpollEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-epoll-worker", true));
@Override
@ -54,7 +54,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
new BootstrapFactory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().group(epollBossGroup, epollWorkerGroup)
return new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP)
.channel(EpollServerSocketChannel.class);
}
},
@ -74,7 +74,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(epollWorkerGroup).channel(EpollSocketChannel.class);
return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class);
}
},
new BootstrapFactory<Bootstrap>() {