From 199d2b499c0091d992f8693d76b879c247f09d11 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 17 Apr 2014 11:19:00 +0200 Subject: [PATCH] [#2405] Add support for SO_REUSEPORT to EpollDatagramChannel Motivation: With SO_REUSEPORT it is possible to bind multiple sockets to the same port and so handle the processing of packets via multiple threads. This allows to handle DatagramPackets with more then one thread on the same port and so gives better performance. Modifications: Expose EpollDatagramChannelConfig.setReusePort(..) and isReusePort() Result: Allow to bind multiple times to the same local address and so archive better performance. --- .../epoll/EpollDatagramChannelConfig.java | 27 +++- .../channel/epoll/EpollReuseAddrTest.java | 129 ++++++++++++++++-- 2 files changed, 140 insertions(+), 16 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java index 8a57706b70..8b6166de29 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java @@ -45,7 +45,8 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple ChannelOption.SO_BROADCAST, ChannelOption.SO_RCVBUF, ChannelOption.SO_SNDBUF, ChannelOption.SO_REUSEADDR, ChannelOption.IP_MULTICAST_LOOP_DISABLED, ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL, - ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION); + ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, + EpollChannelOption.SO_REUSEPORT); } @SuppressWarnings("unchecked") @@ -83,6 +84,9 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) { return (T) Boolean.valueOf(activeOnOpen); } + if (option == EpollChannelOption.SO_REUSEPORT) { + return (T) Boolean.valueOf(isReusePort()); + } return super.getOption(option); } @@ -110,6 +114,8 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple setTrafficClass((Integer) value); } else if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) { setActiveOnOpen((Boolean) value); + } else if (option == EpollChannelOption.SO_REUSEPORT) { + setReusePort((Boolean) value); } else { return super.setOption(option, value); } @@ -279,6 +285,25 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple throw new UnsupportedOperationException("Multicast not supported"); } + /** + * Returns {@code true} if the SO_REUSEPORT option is set. + */ + public boolean isReusePort() { + return Native.isReusePort(datagramChannel.fd) == 1; + } + + /** + * Set the SO_REUSEPORT option on the underlying Channel. This will allow to bind multiple + * {@link EpollSocketChannel}s to the same port and so accept connections with multiple threads. + * + * Be aware this method needs be called before {@link EpollDatagramChannel#bind(java.net.SocketAddress)} to have + * any affect. + */ + public EpollDatagramChannelConfig setReusePort(boolean reusePort) { + Native.setReusePort(datagramChannel.fd, reusePort ? 1 : 0); + return this; + } + @Override protected void autoReadCleared() { datagramChannel.clearEpollIn(); diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java index 5b5fc5a5ff..3170b94090 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollReuseAddrTest.java @@ -15,21 +15,30 @@ */ package io.netty.channel.epoll; +import io.netty.bootstrap.AbstractBootstrap; +import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelOption; import io.netty.testsuite.util.TestUtils; +import io.netty.util.NetUtil; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ResourceLeakDetector; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; public class EpollReuseAddrTest { @@ -53,9 +62,19 @@ public class EpollReuseAddrTest { } @Test - public void testMultipleBindWithoutReusePortFails() { + public void testMultipleBindSocketChannelWithoutReusePortFails() { Assume.assumeTrue(versionEqOrGt(3, 9, 0)); - ServerBootstrap bootstrap = createBootstrap(); + testMultipleBindDatagramChannelWithoutReusePortFails0(createServerBootstrap()); + } + + @Test + public void testMultipleBindDatagramChannelWithoutReusePortFails() { + Assume.assumeTrue(versionEqOrGt(3, 9, 0)); + testMultipleBindDatagramChannelWithoutReusePortFails0(createBootstrap()); + } + + private static void testMultipleBindDatagramChannelWithoutReusePortFails0(AbstractBootstrap bootstrap) { + bootstrap.handler(new DummyHandler()); ChannelFuture future = bootstrap.bind().syncUninterruptibly(); try { bootstrap.bind().syncUninterruptibly(); @@ -67,21 +86,23 @@ public class EpollReuseAddrTest { } @Test(timeout = 10000) - public void testMultipleBind() throws Exception { + public void testMultipleBindSocketChannel() throws Exception { Assume.assumeTrue(versionEqOrGt(3, 9, 0)); - ServerBootstrap bootstrap = createBootstrap(); + ServerBootstrap bootstrap = createServerBootstrap(); bootstrap.option(EpollChannelOption.SO_REUSEPORT, true); final AtomicBoolean accepted1 = new AtomicBoolean(); - bootstrap.childHandler(new TestHandler(accepted1)); + bootstrap.childHandler(new ServerSocketTestHandler(accepted1)); ChannelFuture future = bootstrap.bind().syncUninterruptibly(); + InetSocketAddress address1 = (InetSocketAddress) future.channel().localAddress(); final AtomicBoolean accepted2 = new AtomicBoolean(); - bootstrap.childHandler(new TestHandler(accepted2)); + bootstrap.childHandler(new ServerSocketTestHandler(accepted2)); ChannelFuture future2 = bootstrap.bind().syncUninterruptibly(); - InetSocketAddress address = (InetSocketAddress) future2.channel().localAddress(); + InetSocketAddress address2 = (InetSocketAddress) future2.channel().localAddress(); + Assert.assertEquals(address1, address2); while (!accepted1.get() || !accepted2.get()) { - Socket socket = new Socket(address.getAddress(), address.getPort()); + Socket socket = new Socket(address1.getAddress(), address1.getPort()); socket.setReuseAddress(true); socket.close(); } @@ -89,13 +110,73 @@ public class EpollReuseAddrTest { future2.channel().close().syncUninterruptibly(); } - private ServerBootstrap createBootstrap() { + @Test(timeout = 10000) + public void testMultipleBindDatagramChannel() throws Exception { + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + Assume.assumeTrue(versionEqOrGt(3, 9, 0)); + Bootstrap bootstrap = createBootstrap(); + bootstrap.option(EpollChannelOption.SO_REUSEPORT, true); + final AtomicBoolean received1 = new AtomicBoolean(); + bootstrap.handler(new DatagramSocketTestHandler(received1)); + ChannelFuture future = bootstrap.bind().syncUninterruptibly(); + final InetSocketAddress address1 = (InetSocketAddress) future.channel().localAddress(); + + final AtomicBoolean received2 = new AtomicBoolean(); + bootstrap.handler(new DatagramSocketTestHandler(received2)); + ChannelFuture future2 = bootstrap.bind().syncUninterruptibly(); + final InetSocketAddress address2 = (InetSocketAddress) future2.channel().localAddress(); + + Assert.assertEquals(address1, address2); + final byte[] bytes = "data".getBytes(); + + // fire up 16 Threads and send DatagramPackets to make sure we stress it enough to see DatagramPackets received + // on both sockets. + int count = 16; + final CountDownLatch latch = new CountDownLatch(count); + Runnable r = new Runnable() { + @Override + public void run() { + try { + DatagramSocket socket = new DatagramSocket(); + while (!received1.get() || !received2.get()) { + socket.send(new DatagramPacket( + bytes, 0, bytes.length, address1.getAddress(), address1.getPort())); + } + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + latch.countDown(); + } + }; + + ExecutorService executor = Executors.newFixedThreadPool(count); + for (int i = 0 ; i < count; i++) { + executor.execute(r); + } + latch.await(); + executor.shutdown(); + future.channel().close().syncUninterruptibly(); + future2.channel().close().syncUninterruptibly(); + Assert.assertTrue(received1.get()); + Assert.assertTrue(received2.get()); + } + + private ServerBootstrap createServerBootstrap() { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(EpollSocketTestPermutation.EPOLL_BOSS_GROUP, EpollSocketTestPermutation.EPOLL_WORKER_GROUP); bootstrap.channel(EpollServerSocketChannel.class); - bootstrap.childHandler(new ChannelHandlerAdapter() { }); - bootstrap.option(ChannelOption.SO_REUSEADDR, true); - InetSocketAddress address = new InetSocketAddress(TestUtils.getFreePort()); + bootstrap.childHandler(new DummyHandler()); + InetSocketAddress address = new InetSocketAddress(NetUtil.LOCALHOST, TestUtils.getFreePort()); + bootstrap.localAddress(address); + return bootstrap; + } + + private Bootstrap createBootstrap() { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(EpollSocketTestPermutation.EPOLL_WORKER_GROUP); + bootstrap.channel(EpollDatagramChannel.class); + InetSocketAddress address = new InetSocketAddress(NetUtil.LOCALHOST, TestUtils.getFreePort()); bootstrap.localAddress(address); return bootstrap; } @@ -116,10 +197,10 @@ public class EpollReuseAddrTest { } @ChannelHandler.Sharable - private static class TestHandler extends ChannelInboundHandlerAdapter { + private static class ServerSocketTestHandler extends ChannelInboundHandlerAdapter { private final AtomicBoolean accepted; - TestHandler(AtomicBoolean accepted) { + ServerSocketTestHandler(AtomicBoolean accepted) { this.accepted = accepted; } @@ -129,4 +210,22 @@ public class EpollReuseAddrTest { ctx.close(); } } + + @ChannelHandler.Sharable + private static class DatagramSocketTestHandler extends ChannelInboundHandlerAdapter { + private final AtomicBoolean received; + + DatagramSocketTestHandler(AtomicBoolean received) { + this.received = received; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ReferenceCountUtil.release(msg); + received.set(true); + } + } + + @ChannelHandler.Sharable + private static final class DummyHandler extends ChannelHandlerAdapter { } }