diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index 6d7c407f74..d8b3184454 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -27,6 +27,7 @@ #include #include #include +#include #include "io_netty_channel_epoll_Native.h" @@ -896,6 +897,10 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv setOption(env, fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); } +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReusePort(JNIEnv * env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); +} + JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval) { setOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)); } @@ -940,6 +945,14 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv return optval; } +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReusePort(JNIEnv *env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd) { int optval; if (getOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)) == -1) { @@ -991,3 +1004,16 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv } return optval; } + +JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz) { + struct utsname name; + + int res = uname(&name); + if (res == 0) { + return (*env)->NewStringUTF(env, name.release); + } + int err = errno; + throwRuntimeException(env, exceptionMessage("Error during uname(...): ", err)); + return NULL; + +} diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index 6b6943da45..a82107ee02 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -48,6 +48,7 @@ jlong Java_io_netty_channel_epoll_Native_sendfile(JNIEnv *env, jclass clazz, jin jobject Java_io_netty_channel_epoll_Native_remoteAddress(JNIEnv * env, jclass clazz, jint fd); jobject Java_io_netty_channel_epoll_Native_localAddress(JNIEnv * env, jclass clazz, jint fd); void Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv * env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setReusePort(JNIEnv * env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval); @@ -56,9 +57,11 @@ void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv *env, jclass clazz, ji void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv *env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv *env, jclass clazz, jint fd, jint optval); jint Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_isReusePort(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_getReceiveBufferSize(JNIEnv * env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv *env, jclass clazz, jint fd); +jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java index 58f189641f..060874563b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java @@ -21,6 +21,8 @@ public final class EpollChannelOption { private static final Class T = EpollChannelOption.class; public static final ChannelOption TCP_CORK = ChannelOption.valueOf(T, "TCP_CORK"); + public static final ChannelOption SO_REUSEPORT = ChannelOption.valueOf(T, "SO_REUSEPORT"); private EpollChannelOption() { } + } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java index fb851e1ab8..ae5d545b87 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -20,7 +20,6 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.socket.ServerSocketChannel; -import io.netty.channel.socket.ServerSocketChannelConfig; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -55,7 +54,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme } @Override - public ServerSocketChannelConfig config() { + public EpollServerSocketChannelConfig config() { return config; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java index 791a65b46a..9b8a947def 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java @@ -29,7 +29,7 @@ import static io.netty.channel.ChannelOption.SO_BACKLOG; import static io.netty.channel.ChannelOption.SO_RCVBUF; import static io.netty.channel.ChannelOption.SO_REUSEADDR; -final class EpollServerSocketChannelConfig extends DefaultChannelConfig +public final class EpollServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig { private final EpollServerSocketChannel channel; @@ -42,7 +42,7 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig @Override public Map, Object> getOptions() { - return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); + return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, EpollChannelOption.SO_REUSEPORT); } @SuppressWarnings("unchecked") @@ -57,7 +57,9 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig if (option == SO_BACKLOG) { return (T) Integer.valueOf(getBacklog()); } - + if (option == EpollChannelOption.SO_REUSEPORT) { + return (T) Boolean.valueOf(isReusePort()); + } return super.getOption(option); } @@ -71,6 +73,8 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig setReuseAddress((Boolean) value); } else if (option == SO_BACKLOG) { setBacklog((Integer) value); + } else if (option == EpollChannelOption.SO_REUSEPORT) { + setReusePort((Boolean) value); } else { return super.setOption(option, value); } @@ -84,7 +88,7 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig } @Override - public ServerSocketChannelConfig setReuseAddress(boolean reuseAddress) { + public EpollServerSocketChannelConfig setReuseAddress(boolean reuseAddress) { Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0); return this; } @@ -95,14 +99,14 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig } @Override - public ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { + public EpollServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { Native.setReceiveBufferSize(channel.fd, receiveBufferSize); return this; } @Override - public ServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + public EpollServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) { return this; } @@ -112,7 +116,7 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig } @Override - public ServerSocketChannelConfig setBacklog(int backlog) { + public EpollServerSocketChannelConfig setBacklog(int backlog) { if (backlog < 0) { throw new IllegalArgumentException("backlog: " + backlog); } @@ -121,56 +125,76 @@ final class EpollServerSocketChannelConfig extends DefaultChannelConfig } @Override - public ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + public EpollServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { super.setConnectTimeoutMillis(connectTimeoutMillis); return this; } @Override - public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + public EpollServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; } @Override - public ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) { + public EpollServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) { super.setWriteSpinCount(writeSpinCount); return this; } @Override - public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) { + public EpollServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) { super.setAllocator(allocator); return this; } @Override - public ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + public EpollServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { super.setRecvByteBufAllocator(allocator); return this; } @Override - public ServerSocketChannelConfig setAutoRead(boolean autoRead) { + public EpollServerSocketChannelConfig setAutoRead(boolean autoRead) { super.setAutoRead(autoRead); return this; } @Override - public ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + public EpollServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); return this; } @Override - public ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + public EpollServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); return this; } @Override - public ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + public EpollServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { super.setMessageSizeEstimator(estimator); return this; } + + /** + * Returns {@code true} if the SO_REUSEPORT option is set. + */ + public boolean isReusePort() { + return Native.isReusePort(channel.fd) == 1; + } + + /** + * Set the SO_REUSEPORT option on the underlying Channel. This will allow to bind multiple + * {@link EpollSocketChannel}s to the same port and so accept connections with multiple threads. + * + * Be aware this method needs be called before {@link EpollSocketChannel#bind(java.net.SocketAddress)} to have + * any affect. + */ + public EpollServerSocketChannelConfig setReusePort(boolean reusePort) { + Native.setReusePort(channel.fd, reusePort ? 1 : 0); + return this; + } + } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 43f7cfeebb..0a9532ede4 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -127,6 +127,7 @@ final class Native { public static native int getSendBufferSize(int fd); public static native int isKeepAlive(int fd); public static native int isReuseAddress(int fd); + public static native int isReusePort(int fd); public static native int isTcpNoDelay(int fd); public static native int isTcpCork(int fd); public static native int getSoLinger(int fd); @@ -135,12 +136,14 @@ final class Native { public static native void setKeepAlive(int fd, int keepAlive); public static native void setReceiveBufferSize(int fd, int receiveBufferSize); public static native void setReuseAddress(int fd, int reuseAddress); + public static native void setReusePort(int fd, int reuseAddress); public static native void setSendBufferSize(int fd, int sendBufferSize); public static native void setTcpNoDelay(int fd, int tcpNoDelay); public static native void setTcpCork(int fd, int tcpCork); public static native void setSoLinger(int fd, int soLinger); public static native void setTrafficClass(int fd, int tcpNoDelay); + public static native String kernelVersion(); private Native() { // utility } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java new file mode 100644 index 0000000000..5b5fc5a5ff --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java @@ -0,0 +1,132 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOption; +import io.netty.testsuite.util.TestUtils; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.concurrent.atomic.AtomicBoolean; + +public class EpollReuseAddrTest { + private static final int MAJOR; + private static final int MINOR; + private static final int BUGFIX; + static { + String kernelVersion = Native.kernelVersion(); + int index = kernelVersion.indexOf("-"); + if (index > -1) { + kernelVersion = kernelVersion.substring(0, index); + } + String[] versionParts = kernelVersion.split("\\."); + if (versionParts.length == 3) { + MAJOR = Integer.parseInt(versionParts[0]); + MINOR = Integer.parseInt(versionParts[1]); + BUGFIX = Integer.parseInt(versionParts[2]); + } else { + throw new IllegalStateException(); + } + } + + @Test + public void testMultipleBindWithoutReusePortFails() { + Assume.assumeTrue(versionEqOrGt(3, 9, 0)); + ServerBootstrap bootstrap = createBootstrap(); + ChannelFuture future = bootstrap.bind().syncUninterruptibly(); + try { + bootstrap.bind().syncUninterruptibly(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e instanceof IOException); + } + future.channel().close().syncUninterruptibly(); + } + + @Test(timeout = 10000) + public void testMultipleBind() throws Exception { + Assume.assumeTrue(versionEqOrGt(3, 9, 0)); + ServerBootstrap bootstrap = createBootstrap(); + bootstrap.option(EpollChannelOption.SO_REUSEPORT, true); + final AtomicBoolean accepted1 = new AtomicBoolean(); + bootstrap.childHandler(new TestHandler(accepted1)); + ChannelFuture future = bootstrap.bind().syncUninterruptibly(); + + final AtomicBoolean accepted2 = new AtomicBoolean(); + bootstrap.childHandler(new TestHandler(accepted2)); + ChannelFuture future2 = bootstrap.bind().syncUninterruptibly(); + InetSocketAddress address = (InetSocketAddress) future2.channel().localAddress(); + + while (!accepted1.get() || !accepted2.get()) { + Socket socket = new Socket(address.getAddress(), address.getPort()); + socket.setReuseAddress(true); + socket.close(); + } + future.channel().close().syncUninterruptibly(); + future2.channel().close().syncUninterruptibly(); + } + + private ServerBootstrap createBootstrap() { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(EpollSocketTestPermutation.EPOLL_BOSS_GROUP, EpollSocketTestPermutation.EPOLL_WORKER_GROUP); + bootstrap.channel(EpollServerSocketChannel.class); + bootstrap.childHandler(new ChannelHandlerAdapter() { }); + bootstrap.option(ChannelOption.SO_REUSEADDR, true); + InetSocketAddress address = new InetSocketAddress(TestUtils.getFreePort()); + bootstrap.localAddress(address); + return bootstrap; + } + + private static boolean versionEqOrGt(int major, int minor, int bugfix) { + if (MAJOR > major) { + return true; + } else if (MAJOR == major) { + if (MINOR > minor) { + return true; + } else if (MINOR == minor) { + if (BUGFIX >= bugfix) { + return true; + } + } + } + return false; + } + + @ChannelHandler.Sharable + private static class TestHandler extends ChannelInboundHandlerAdapter { + private final AtomicBoolean accepted; + + TestHandler(AtomicBoolean accepted) { + this.accepted = accepted; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + accepted.set(true); + ctx.close(); + } + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java index a452a956e4..e08562e5e6 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java @@ -32,9 +32,9 @@ class EpollSocketTestPermutation extends SocketTestPermutation { static final SocketTestPermutation INSTANCE = new EpollSocketTestPermutation(); - private final EventLoopGroup epollBossGroup = + static final EventLoopGroup EPOLL_BOSS_GROUP = new EpollEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-epoll-boss", true)); - private final EventLoopGroup epollWorkerGroup = + static final EventLoopGroup EPOLL_WORKER_GROUP = new EpollEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-epoll-worker", true)); @Override @@ -54,7 +54,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation { new BootstrapFactory() { @Override public ServerBootstrap newInstance() { - return new ServerBootstrap().group(epollBossGroup, epollWorkerGroup) + return new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP) .channel(EpollServerSocketChannel.class); } }, @@ -74,7 +74,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation { new BootstrapFactory() { @Override public Bootstrap newInstance() { - return new Bootstrap().group(epollWorkerGroup).channel(EpollSocketChannel.class); + return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class); } }, new BootstrapFactory() {