diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java index 8a57706b70..8b6166de29 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java @@ -45,7 +45,8 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple ChannelOption.SO_BROADCAST, ChannelOption.SO_RCVBUF, ChannelOption.SO_SNDBUF, ChannelOption.SO_REUSEADDR, ChannelOption.IP_MULTICAST_LOOP_DISABLED, ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL, - ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION); + ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, + EpollChannelOption.SO_REUSEPORT); } @SuppressWarnings("unchecked") @@ -83,6 +84,9 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) { return (T) Boolean.valueOf(activeOnOpen); } + if (option == EpollChannelOption.SO_REUSEPORT) { + return (T) Boolean.valueOf(isReusePort()); + } return super.getOption(option); } @@ -110,6 +114,8 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple setTrafficClass((Integer) value); } else if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) { setActiveOnOpen((Boolean) value); + } else if (option == EpollChannelOption.SO_REUSEPORT) { + setReusePort((Boolean) value); } else { return super.setOption(option, value); } @@ -279,6 +285,25 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple throw new UnsupportedOperationException("Multicast not supported"); } + /** + * Returns {@code true} if the SO_REUSEPORT option is set. + */ + public boolean isReusePort() { + return Native.isReusePort(datagramChannel.fd) == 1; + } + + /** + * Set the SO_REUSEPORT option on the underlying Channel. This will allow to bind multiple + * {@link EpollSocketChannel}s to the same port and so accept connections with multiple threads. + * + * Be aware this method needs be called before {@link EpollDatagramChannel#bind(java.net.SocketAddress)} to have + * any affect. + */ + public EpollDatagramChannelConfig setReusePort(boolean reusePort) { + Native.setReusePort(datagramChannel.fd, reusePort ? 1 : 0); + return this; + } + @Override protected void autoReadCleared() { datagramChannel.clearEpollIn(); diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java index 5b5fc5a5ff..3170b94090 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java @@ -15,21 +15,30 @@ */ package io.netty.channel.epoll; +import io.netty.bootstrap.AbstractBootstrap; +import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelOption; import io.netty.testsuite.util.TestUtils; +import io.netty.util.NetUtil; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ResourceLeakDetector; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; public class EpollReuseAddrTest { @@ -53,9 +62,19 @@ public class EpollReuseAddrTest { } @Test - public void testMultipleBindWithoutReusePortFails() { + public void testMultipleBindSocketChannelWithoutReusePortFails() { Assume.assumeTrue(versionEqOrGt(3, 9, 0)); - ServerBootstrap bootstrap = createBootstrap(); + testMultipleBindDatagramChannelWithoutReusePortFails0(createServerBootstrap()); + } + + @Test + public void testMultipleBindDatagramChannelWithoutReusePortFails() { + Assume.assumeTrue(versionEqOrGt(3, 9, 0)); + testMultipleBindDatagramChannelWithoutReusePortFails0(createBootstrap()); + } + + private static void testMultipleBindDatagramChannelWithoutReusePortFails0(AbstractBootstrap bootstrap) { + bootstrap.handler(new DummyHandler()); ChannelFuture future = bootstrap.bind().syncUninterruptibly(); try { bootstrap.bind().syncUninterruptibly(); @@ -67,21 +86,23 @@ public class EpollReuseAddrTest { } @Test(timeout = 10000) - public void testMultipleBind() throws Exception { + public void testMultipleBindSocketChannel() throws Exception { Assume.assumeTrue(versionEqOrGt(3, 9, 0)); - ServerBootstrap bootstrap = createBootstrap(); + ServerBootstrap bootstrap = createServerBootstrap(); bootstrap.option(EpollChannelOption.SO_REUSEPORT, true); final AtomicBoolean accepted1 = new AtomicBoolean(); - bootstrap.childHandler(new TestHandler(accepted1)); + bootstrap.childHandler(new ServerSocketTestHandler(accepted1)); ChannelFuture future = bootstrap.bind().syncUninterruptibly(); + InetSocketAddress address1 = (InetSocketAddress) future.channel().localAddress(); final AtomicBoolean accepted2 = new AtomicBoolean(); - bootstrap.childHandler(new TestHandler(accepted2)); + bootstrap.childHandler(new ServerSocketTestHandler(accepted2)); ChannelFuture future2 = bootstrap.bind().syncUninterruptibly(); - InetSocketAddress address = (InetSocketAddress) future2.channel().localAddress(); + InetSocketAddress address2 = (InetSocketAddress) future2.channel().localAddress(); + Assert.assertEquals(address1, address2); while (!accepted1.get() || !accepted2.get()) { - Socket socket = new Socket(address.getAddress(), address.getPort()); + Socket socket = new Socket(address1.getAddress(), address1.getPort()); socket.setReuseAddress(true); socket.close(); } @@ -89,13 +110,73 @@ public class EpollReuseAddrTest { future2.channel().close().syncUninterruptibly(); } - private ServerBootstrap createBootstrap() { + @Test(timeout = 10000) + public void testMultipleBindDatagramChannel() throws Exception { + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + Assume.assumeTrue(versionEqOrGt(3, 9, 0)); + Bootstrap bootstrap = createBootstrap(); + bootstrap.option(EpollChannelOption.SO_REUSEPORT, true); + final AtomicBoolean received1 = new AtomicBoolean(); + bootstrap.handler(new DatagramSocketTestHandler(received1)); + ChannelFuture future = bootstrap.bind().syncUninterruptibly(); + final InetSocketAddress address1 = (InetSocketAddress) future.channel().localAddress(); + + final AtomicBoolean received2 = new AtomicBoolean(); + bootstrap.handler(new DatagramSocketTestHandler(received2)); + ChannelFuture future2 = bootstrap.bind().syncUninterruptibly(); + final InetSocketAddress address2 = (InetSocketAddress) future2.channel().localAddress(); + + Assert.assertEquals(address1, address2); + final byte[] bytes = "data".getBytes(); + + // fire up 16 Threads and send DatagramPackets to make sure we stress it enough to see DatagramPackets received + // on both sockets. + int count = 16; + final CountDownLatch latch = new CountDownLatch(count); + Runnable r = new Runnable() { + @Override + public void run() { + try { + DatagramSocket socket = new DatagramSocket(); + while (!received1.get() || !received2.get()) { + socket.send(new DatagramPacket( + bytes, 0, bytes.length, address1.getAddress(), address1.getPort())); + } + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + latch.countDown(); + } + }; + + ExecutorService executor = Executors.newFixedThreadPool(count); + for (int i = 0 ; i < count; i++) { + executor.execute(r); + } + latch.await(); + executor.shutdown(); + future.channel().close().syncUninterruptibly(); + future2.channel().close().syncUninterruptibly(); + Assert.assertTrue(received1.get()); + Assert.assertTrue(received2.get()); + } + + private ServerBootstrap createServerBootstrap() { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(EpollSocketTestPermutation.EPOLL_BOSS_GROUP, EpollSocketTestPermutation.EPOLL_WORKER_GROUP); bootstrap.channel(EpollServerSocketChannel.class); - bootstrap.childHandler(new ChannelHandlerAdapter() { }); - bootstrap.option(ChannelOption.SO_REUSEADDR, true); - InetSocketAddress address = new InetSocketAddress(TestUtils.getFreePort()); + bootstrap.childHandler(new DummyHandler()); + InetSocketAddress address = new InetSocketAddress(NetUtil.LOCALHOST, TestUtils.getFreePort()); + bootstrap.localAddress(address); + return bootstrap; + } + + private Bootstrap createBootstrap() { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(EpollSocketTestPermutation.EPOLL_WORKER_GROUP); + bootstrap.channel(EpollDatagramChannel.class); + InetSocketAddress address = new InetSocketAddress(NetUtil.LOCALHOST, TestUtils.getFreePort()); bootstrap.localAddress(address); return bootstrap; } @@ -116,10 +197,10 @@ public class EpollReuseAddrTest { } @ChannelHandler.Sharable - private static class TestHandler extends ChannelInboundHandlerAdapter { + private static class ServerSocketTestHandler extends ChannelInboundHandlerAdapter { private final AtomicBoolean accepted; - TestHandler(AtomicBoolean accepted) { + ServerSocketTestHandler(AtomicBoolean accepted) { this.accepted = accepted; } @@ -129,4 +210,22 @@ public class EpollReuseAddrTest { ctx.close(); } } + + @ChannelHandler.Sharable + private static class DatagramSocketTestHandler extends ChannelInboundHandlerAdapter { + private final AtomicBoolean received; + + DatagramSocketTestHandler(AtomicBoolean received) { + this.received = received; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ReferenceCountUtil.release(msg); + received.set(true); + } + } + + @ChannelHandler.Sharable + private static final class DummyHandler extends ChannelHandlerAdapter { } }