[#2405] Add support for SO_REUSEPORT to EpollDatagramChannel

Motivation:
With SO_REUSEPORT it is possible to bind multiple sockets to the same port and so handle the processing of packets via multiple threads. This allows to handle DatagramPackets with more then one thread on the same port and so gives better performance.

Modifications:
Expose EpollDatagramChannelConfig.setReusePort(..) and isReusePort()

Result:
Allow to bind multiple times to the same local address and so archive better performance.
This commit is contained in:
Norman Maurer 2014-04-17 11:19:00 +02:00
parent 20ef4690e7
commit 199d2b499c
2 changed files with 140 additions and 16 deletions

View File

@ -45,7 +45,8 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
ChannelOption.SO_BROADCAST, ChannelOption.SO_RCVBUF, ChannelOption.SO_SNDBUF,
ChannelOption.SO_REUSEADDR, ChannelOption.IP_MULTICAST_LOOP_DISABLED,
ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL,
ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION);
ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION,
EpollChannelOption.SO_REUSEPORT);
}
@SuppressWarnings("unchecked")
@ -83,6 +84,9 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
return (T) Boolean.valueOf(activeOnOpen);
}
if (option == EpollChannelOption.SO_REUSEPORT) {
return (T) Boolean.valueOf(isReusePort());
}
return super.getOption(option);
}
@ -110,6 +114,8 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
setTrafficClass((Integer) value);
} else if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
setActiveOnOpen((Boolean) value);
} else if (option == EpollChannelOption.SO_REUSEPORT) {
setReusePort((Boolean) value);
} else {
return super.setOption(option, value);
}
@ -279,6 +285,25 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
throw new UnsupportedOperationException("Multicast not supported");
}
/**
* Returns {@code true} if the SO_REUSEPORT option is set.
*/
public boolean isReusePort() {
return Native.isReusePort(datagramChannel.fd) == 1;
}
/**
* Set the SO_REUSEPORT option on the underlying Channel. This will allow to bind multiple
* {@link EpollSocketChannel}s to the same port and so accept connections with multiple threads.
*
* Be aware this method needs be called before {@link EpollDatagramChannel#bind(java.net.SocketAddress)} to have
* any affect.
*/
public EpollDatagramChannelConfig setReusePort(boolean reusePort) {
Native.setReusePort(datagramChannel.fd, reusePort ? 1 : 0);
return this;
}
@Override
protected void autoReadCleared() {
datagramChannel.clearEpollIn();

View File

@ -15,21 +15,30 @@
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.testsuite.util.TestUtils;
import io.netty.util.NetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakDetector;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
public class EpollReuseAddrTest {
@ -53,9 +62,19 @@ public class EpollReuseAddrTest {
}
@Test
public void testMultipleBindWithoutReusePortFails() {
public void testMultipleBindSocketChannelWithoutReusePortFails() {
Assume.assumeTrue(versionEqOrGt(3, 9, 0));
ServerBootstrap bootstrap = createBootstrap();
testMultipleBindDatagramChannelWithoutReusePortFails0(createServerBootstrap());
}
@Test
public void testMultipleBindDatagramChannelWithoutReusePortFails() {
Assume.assumeTrue(versionEqOrGt(3, 9, 0));
testMultipleBindDatagramChannelWithoutReusePortFails0(createBootstrap());
}
private static void testMultipleBindDatagramChannelWithoutReusePortFails0(AbstractBootstrap<?, ?> bootstrap) {
bootstrap.handler(new DummyHandler());
ChannelFuture future = bootstrap.bind().syncUninterruptibly();
try {
bootstrap.bind().syncUninterruptibly();
@ -67,21 +86,23 @@ public class EpollReuseAddrTest {
}
@Test(timeout = 10000)
public void testMultipleBind() throws Exception {
public void testMultipleBindSocketChannel() throws Exception {
Assume.assumeTrue(versionEqOrGt(3, 9, 0));
ServerBootstrap bootstrap = createBootstrap();
ServerBootstrap bootstrap = createServerBootstrap();
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
final AtomicBoolean accepted1 = new AtomicBoolean();
bootstrap.childHandler(new TestHandler(accepted1));
bootstrap.childHandler(new ServerSocketTestHandler(accepted1));
ChannelFuture future = bootstrap.bind().syncUninterruptibly();
InetSocketAddress address1 = (InetSocketAddress) future.channel().localAddress();
final AtomicBoolean accepted2 = new AtomicBoolean();
bootstrap.childHandler(new TestHandler(accepted2));
bootstrap.childHandler(new ServerSocketTestHandler(accepted2));
ChannelFuture future2 = bootstrap.bind().syncUninterruptibly();
InetSocketAddress address = (InetSocketAddress) future2.channel().localAddress();
InetSocketAddress address2 = (InetSocketAddress) future2.channel().localAddress();
Assert.assertEquals(address1, address2);
while (!accepted1.get() || !accepted2.get()) {
Socket socket = new Socket(address.getAddress(), address.getPort());
Socket socket = new Socket(address1.getAddress(), address1.getPort());
socket.setReuseAddress(true);
socket.close();
}
@ -89,13 +110,73 @@ public class EpollReuseAddrTest {
future2.channel().close().syncUninterruptibly();
}
private ServerBootstrap createBootstrap() {
@Test(timeout = 10000)
public void testMultipleBindDatagramChannel() throws Exception {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
Assume.assumeTrue(versionEqOrGt(3, 9, 0));
Bootstrap bootstrap = createBootstrap();
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
final AtomicBoolean received1 = new AtomicBoolean();
bootstrap.handler(new DatagramSocketTestHandler(received1));
ChannelFuture future = bootstrap.bind().syncUninterruptibly();
final InetSocketAddress address1 = (InetSocketAddress) future.channel().localAddress();
final AtomicBoolean received2 = new AtomicBoolean();
bootstrap.handler(new DatagramSocketTestHandler(received2));
ChannelFuture future2 = bootstrap.bind().syncUninterruptibly();
final InetSocketAddress address2 = (InetSocketAddress) future2.channel().localAddress();
Assert.assertEquals(address1, address2);
final byte[] bytes = "data".getBytes();
// fire up 16 Threads and send DatagramPackets to make sure we stress it enough to see DatagramPackets received
// on both sockets.
int count = 16;
final CountDownLatch latch = new CountDownLatch(count);
Runnable r = new Runnable() {
@Override
public void run() {
try {
DatagramSocket socket = new DatagramSocket();
while (!received1.get() || !received2.get()) {
socket.send(new DatagramPacket(
bytes, 0, bytes.length, address1.getAddress(), address1.getPort()));
}
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
latch.countDown();
}
};
ExecutorService executor = Executors.newFixedThreadPool(count);
for (int i = 0 ; i < count; i++) {
executor.execute(r);
}
latch.await();
executor.shutdown();
future.channel().close().syncUninterruptibly();
future2.channel().close().syncUninterruptibly();
Assert.assertTrue(received1.get());
Assert.assertTrue(received2.get());
}
private ServerBootstrap createServerBootstrap() {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(EpollSocketTestPermutation.EPOLL_BOSS_GROUP, EpollSocketTestPermutation.EPOLL_WORKER_GROUP);
bootstrap.channel(EpollServerSocketChannel.class);
bootstrap.childHandler(new ChannelHandlerAdapter() { });
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
InetSocketAddress address = new InetSocketAddress(TestUtils.getFreePort());
bootstrap.childHandler(new DummyHandler());
InetSocketAddress address = new InetSocketAddress(NetUtil.LOCALHOST, TestUtils.getFreePort());
bootstrap.localAddress(address);
return bootstrap;
}
private Bootstrap createBootstrap() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(EpollSocketTestPermutation.EPOLL_WORKER_GROUP);
bootstrap.channel(EpollDatagramChannel.class);
InetSocketAddress address = new InetSocketAddress(NetUtil.LOCALHOST, TestUtils.getFreePort());
bootstrap.localAddress(address);
return bootstrap;
}
@ -116,10 +197,10 @@ public class EpollReuseAddrTest {
}
@ChannelHandler.Sharable
private static class TestHandler extends ChannelInboundHandlerAdapter {
private static class ServerSocketTestHandler extends ChannelInboundHandlerAdapter {
private final AtomicBoolean accepted;
TestHandler(AtomicBoolean accepted) {
ServerSocketTestHandler(AtomicBoolean accepted) {
this.accepted = accepted;
}
@ -129,4 +210,22 @@ public class EpollReuseAddrTest {
ctx.close();
}
}
@ChannelHandler.Sharable
private static class DatagramSocketTestHandler extends ChannelInboundHandlerAdapter {
private final AtomicBoolean received;
DatagramSocketTestHandler(AtomicBoolean received) {
this.received = received;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ReferenceCountUtil.release(msg);
received.set(true);
}
}
@ChannelHandler.Sharable
private static final class DummyHandler extends ChannelHandlerAdapter { }
}