From 02c3e710128c5510fbafa1b37603c4ffda326cd8 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 11 Apr 2014 15:54:31 +0200 Subject: [PATCH] [#2376] Add support for SO_REUSEPORT in native transport Motivation: In linux kernel 3.9 a new featured named SO_REUSEPORT was introduced which allows to have multiple sockets bind to the same port and so handle the accept() of new connections with multiple threads. This can greatly improve the performance when you not to accept a lot of connections. Modifications: Implement SO_REUSEPORT via JNI Result: Be able to use the SO_REUSEPORT feature when using the EpollServerSocketChannel --- .../main/c/io_netty_channel_epoll_Native.c | 26 ++++ .../main/c/io_netty_channel_epoll_Native.h | 3 + .../channel/epoll/EpollChannelOption.java | 2 + .../epoll/EpollServerSocketChannel.java | 3 +- .../epoll/EpollServerSocketChannelConfig.java | 56 +++++--- .../java/io/netty/channel/epoll/Native.java | 3 + .../channel/epoll/EpollReuseAddrTest.java | 131 ++++++++++++++++++ .../epoll/EpollSocketTestPermutation.java | 8 +- 8 files changed, 210 insertions(+), 22 deletions(-) create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index b140c317fe..963b2e1959 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -27,6 +27,7 @@ #include #include #include +#include #include "io_netty_channel_epoll_Native.h" @@ -845,6 +846,10 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv setOption(env, fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); } +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReusePort(JNIEnv * env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); +} + JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval) { setOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)); } @@ -889,6 +894,14 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv return optval; } +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReusePort(JNIEnv *env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd) { int optval; if (getOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)) == -1) { @@ -940,3 +953,16 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv } return optval; } + +JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz) { + struct utsname name; + + int res = uname(&name); + if (res == 0) { + return (*env)->NewStringUTF(env, name.release); + } + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during uname(...): ", err)); + return NULL; + +} diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index 9f085af216..39ec6c8af1 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -46,6 +46,7 @@ jlong Java_io_netty_channel_epoll_Native_sendfile(JNIEnv *env, jclass clazz, jin jobject Java_io_netty_channel_epoll_Native_remoteAddress(JNIEnv * env, jclass clazz, jint fd); jobject Java_io_netty_channel_epoll_Native_localAddress(JNIEnv * env, jclass clazz, jint fd); void Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv * env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setReusePort(JNIEnv * env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval); @@ -54,9 +55,11 @@ void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv *env, jclass clazz, ji void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv *env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv *env, jclass clazz, jint fd, jint optval); jint Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_isReusePort(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_getReceiveBufferSize(JNIEnv * env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv *env, jclass clazz, jint fd); +jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java index 8931a83cb8..a1d8fd3b7a 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java @@ -21,6 +21,8 @@ public final class EpollChannelOption extends ChannelOption { public static final ChannelOption TCP_CORK = valueOf("TCP_CORK"); + public static final ChannelOption SO_REUSEPORT = valueOf("SO_REUSEPORT"); + @SuppressWarnings({ "unused", "deprecation" }) private EpollChannelOption(String name) { super(name); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java index fb851e1ab8..ae5d545b87 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -20,7 +20,6 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.socket.ServerSocketChannel; -import io.netty.channel.socket.ServerSocketChannelConfig; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -55,7 +54,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme } @Override - public ServerSocketChannelConfig config() { + public EpollServerSocketChannelConfig config() { return config; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java index 791a65b46a..9b8a947def 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java @@ -29,7 +29,7 @@ import static io.netty.channel.ChannelOption.SO_BACKLOG; import static io.netty.channel.ChannelOption.SO_RCVBUF; import static io.netty.channel.ChannelOption.SO_REUSEADDR; -final class EpollServerSocketChannelConfig extends DefaultChannelConfig +public final class EpollServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig { private final EpollServerSocketChannel channel; @@ -42,7 +42,7 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig @Override public Map, Object> getOptions() { - return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); + return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, EpollChannelOption.SO_REUSEPORT); } @SuppressWarnings("unchecked") @@ -57,7 +57,9 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig if (option == SO_BACKLOG) { return (T) Integer.valueOf(getBacklog()); } - + if (option == EpollChannelOption.SO_REUSEPORT) { + return (T) Boolean.valueOf(isReusePort()); + } return super.getOption(option); } @@ -71,6 +73,8 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig setReuseAddress((Boolean) value); } else if (option == SO_BACKLOG) { setBacklog((Integer) value); + } else if (option == EpollChannelOption.SO_REUSEPORT) { + setReusePort((Boolean) value); } else { return super.setOption(option, value); } @@ -84,7 +88,7 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig } @Override - public ServerSocketChannelConfig setReuseAddress(boolean reuseAddress) { + public EpollServerSocketChannelConfig setReuseAddress(boolean reuseAddress) { Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0); return this; } @@ -95,14 +99,14 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig } @Override - public ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { + public EpollServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { Native.setReceiveBufferSize(channel.fd, receiveBufferSize); return this; } @Override - public ServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + public EpollServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) { return this; } @@ -112,7 +116,7 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig } @Override - public ServerSocketChannelConfig setBacklog(int backlog) { + public EpollServerSocketChannelConfig setBacklog(int backlog) { if (backlog < 0) { throw new IllegalArgumentException("backlog: " + backlog); } @@ -121,56 +125,76 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig } @Override - public ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + public EpollServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { super.setConnectTimeoutMillis(connectTimeoutMillis); return this; } @Override - public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + public EpollServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; } @Override - public ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) { + public EpollServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) { super.setWriteSpinCount(writeSpinCount); return this; } @Override - public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) { + public EpollServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) { super.setAllocator(allocator); return this; } @Override - public ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + public EpollServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { super.setRecvByteBufAllocator(allocator); return this; } @Override - public ServerSocketChannelConfig setAutoRead(boolean autoRead) { + public EpollServerSocketChannelConfig setAutoRead(boolean autoRead) { super.setAutoRead(autoRead); return this; } @Override - public ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + public EpollServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); return this; } @Override - public ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + public EpollServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); return this; } @Override - public ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + public EpollServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { super.setMessageSizeEstimator(estimator); return this; } + + /** + * Returns {@code true} if the SO_REUSEPORT option is set. + */ + public boolean isReusePort() { + return Native.isReusePort(channel.fd) == 1; + } + + /** + * Set the SO_REUSEPORT option on the underlying Channel. This will allow to bind multiple + * {@link EpollSocketChannel}s to the same port and so accept connections with multiple threads. + * + * Be aware this method needs be called before {@link EpollSocketChannel#bind(java.net.SocketAddress)} to have + * any affect. + */ + public EpollServerSocketChannelConfig setReusePort(boolean reusePort) { + Native.setReusePort(channel.fd, reusePort ? 1 : 0); + return this; + } + } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index a8007d4481..2544f8934d 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -123,6 +123,7 @@ final class Native { public static native int getSendBufferSize(int fd); public static native int isKeepAlive(int fd); public static native int isReuseAddress(int fd); + public static native int isReusePort(int fd); public static native int isTcpNoDelay(int fd); public static native int isTcpCork(int fd); public static native int getSoLinger(int fd); @@ -131,12 +132,14 @@ final class Native { public static native void setKeepAlive(int fd, int keepAlive); public static native void setReceiveBufferSize(int fd, int receiveBufferSize); public static native void setReuseAddress(int fd, int reuseAddress); + public static native void setReusePort(int fd, int reuseAddress); public static native void setSendBufferSize(int fd, int sendBufferSize); public static native void setTcpNoDelay(int fd, int tcpNoDelay); public static native void setTcpCork(int fd, int tcpCork); public static native void setSoLinger(int fd, int soLinger); public static native void setTrafficClass(int fd, int tcpNoDelay); + public static native String kernelVersion(); private Native() { // utility } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java new file mode 100644 index 0000000000..1d37293fa0 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java @@ -0,0 +1,131 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.testsuite.util.TestUtils; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.concurrent.atomic.AtomicBoolean; + +public class EpollReuseAddrTest { + private static final int MAJOR; + private static final int MINOR; + private static final int BUGFIX; + static { + String kernelVersion = Native.kernelVersion(); + int index = kernelVersion.indexOf("-"); + if (index > -1) { + kernelVersion = kernelVersion.substring(0, index); + } + String[] versionParts = kernelVersion.split("\\."); + if (versionParts.length == 3) { + MAJOR = Integer.parseInt(versionParts[0]); + MINOR = Integer.parseInt(versionParts[1]); + BUGFIX = Integer.parseInt(versionParts[2]); + } else { + throw new IllegalStateException(); + } + } + + @Test + public void testMultipleBindWithoutReusePortFails() { + Assume.assumeTrue(versionEqOrGt(3, 9, 0)); + ServerBootstrap bootstrap = createBootstrap(); + ChannelFuture future = bootstrap.bind().syncUninterruptibly(); + try { + bootstrap.bind().syncUninterruptibly(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e instanceof IOException); + } + future.channel().close().syncUninterruptibly(); + } + + @Test(timeout = 10000) + public void testMultipleBind() throws Exception { + Assume.assumeTrue(versionEqOrGt(3, 9, 0)); + ServerBootstrap bootstrap = createBootstrap(); + bootstrap.option(EpollChannelOption.SO_REUSEPORT, true); + final AtomicBoolean accepted1 = new AtomicBoolean(); + bootstrap.childHandler(new TestHandler(accepted1)); + ChannelFuture future = bootstrap.bind().syncUninterruptibly(); + + final AtomicBoolean accepted2 = new AtomicBoolean(); + bootstrap.childHandler(new TestHandler(accepted2)); + ChannelFuture future2 = bootstrap.bind().syncUninterruptibly(); + InetSocketAddress address = (InetSocketAddress) future2.channel().localAddress(); + + while (!accepted1.get() || !accepted2.get()) { + Socket socket = new Socket(address.getAddress(), address.getPort()); + socket.setReuseAddress(true); + socket.close(); + } + future.channel().close().syncUninterruptibly(); + future2.channel().close().syncUninterruptibly(); + } + + private ServerBootstrap createBootstrap() { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(EpollSocketTestPermutation.EPOLL_BOSS_GROUP, EpollSocketTestPermutation.EPOLL_WORKER_GROUP); + bootstrap.channel(EpollServerSocketChannel.class); + bootstrap.childHandler(new ChannelHandlerAdapter() { }); + bootstrap.option(EpollChannelOption.SO_REUSEADDR, true); + InetSocketAddress address = new InetSocketAddress(TestUtils.getFreePort()); + bootstrap.localAddress(address); + return bootstrap; + } + + private static boolean versionEqOrGt(int major, int minor, int bugfix) { + if (MAJOR > major) { + return true; + } else if (MAJOR == major) { + if (MINOR > minor) { + return true; + } else if (MINOR == minor) { + if (BUGFIX >= bugfix) { + return true; + } + } + } + return false; + } + + @ChannelHandler.Sharable + private static class TestHandler extends ChannelInboundHandlerAdapter { + private final AtomicBoolean accepted; + + TestHandler(AtomicBoolean accepted) { + this.accepted = accepted; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + accepted.set(true); + ctx.close(); + } + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java index a452a956e4..e08562e5e6 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java @@ -32,9 +32,9 @@ class EpollSocketTestPermutation extends SocketTestPermutation { static final SocketTestPermutation INSTANCE = new EpollSocketTestPermutation(); - private final EventLoopGroup epollBossGroup = + static final EventLoopGroup EPOLL_BOSS_GROUP = new EpollEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-epoll-boss", true)); - private final EventLoopGroup epollWorkerGroup = + static final EventLoopGroup EPOLL_WORKER_GROUP = new EpollEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-epoll-worker", true)); @Override @@ -54,7 +54,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation { new BootstrapFactory() { @Override public ServerBootstrap newInstance() { - return new ServerBootstrap().group(epollBossGroup, epollWorkerGroup) + return new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP) .channel(EpollServerSocketChannel.class); } }, @@ -74,7 +74,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation { new BootstrapFactory() { @Override public Bootstrap newInstance() { - return new Bootstrap().group(epollWorkerGroup).channel(EpollSocketChannel.class); + return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class); } }, new BootstrapFactory() {