From 56adab2743e90175c958c1fb0ce8857a851e02ea Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 15 Feb 2021 13:13:44 +0100 Subject: [PATCH] TCP Fast Open for clients (#11006) Support TCP Fast Open for clients and make SslHandler take advantage Motivation: - TCP Fast Open allow us to send a small amount of data along side the initial SYN packet when establishing a TCP connection. - The TLS Client Hello packet is small enough to fit in there, and is also idempotent (another requirement for using TCP Fast Open), so if we can save a round-trip when establishing TLS connections when using TFO. Modification: - Add support for client-side TCP Fast Open for Epoll, and also lowers the Linux kernel version requirements to 3.6. - When adding the SslHandler to a pipeline, if TCP Fast Open is enabled for the channel (and the channel is not already active) then start the handshake early by writing it to the outbound buffer. - An important detail to note here, is that the outbound buffer is not flushed at this point, like it would for normal handshakes. The flushing happens later as part of establishing the TCP connection. Result: - It is now possible for clients (on epoll) to open connections with TCP Fast Open. - The SslHandler automatically detects when this is the case, and now send its Client Hello message as part of the initial data in the TCP Fast Open flow when available, saving a round-trip when establishing TLS connections. Co-authored-by: Colin Godsey --- .../java/io/netty/handler/ssl/SslHandler.java | 49 +++++---- .../transport/socket/SocketConnectTest.java | 104 +++++++++++++++++- .../socket/SocketMultipleConnectTest.java | 2 +- .../socket/SocketReadPendingTest.java | 2 +- .../socket/SocketTestPermutation.java | 20 ++++ .../src/main/c/netty_epoll_linuxsocket.c | 20 ---- .../channel/epoll/AbstractEpollChannel.java | 41 ++++++- .../channel/epoll/EpollChannelOption.java | 8 +- .../channel/epoll/EpollDatagramChannel.java | 36 +----- .../channel/epoll/EpollSocketChannel.java | 26 +++++ .../epoll/EpollSocketChannelConfig.java | 31 ++---- .../io/netty/channel/epoll/LinuxSocket.java | 10 -- ...pollCompositeBufferGatheringWriteTest.java | 2 +- .../epoll/EpollETSocketAutoReadTest.java | 2 +- ...ollETSocketConditionalWritabilityTest.java | 2 +- ...EpollETSocketDataReadInitialStateTest.java | 2 +- .../EpollETSocketExceptionHandlingTest.java | 2 +- .../epoll/EpollETSocketHalfClosed.java | 2 +- .../epoll/EpollETSocketReadPendingTest.java | 2 +- .../epoll/EpollLTSocketAutoReadTest.java | 2 +- ...ollLTSocketConditionalWritabilityTest.java | 2 +- ...EpollLTSocketDataReadInitialStateTest.java | 2 +- .../EpollLTSocketExceptionHandlingTest.java | 2 +- .../epoll/EpollLTSocketHalfClosed.java | 2 +- .../epoll/EpollLTSocketReadPendingTest.java | 2 +- ...EpollSocketChannelNotYetConnectedTest.java | 2 +- .../epoll/EpollSocketCloseForciblyTest.java | 2 +- .../channel/epoll/EpollSocketConnectTest.java | 9 +- .../channel/epoll/EpollSocketEchoTest.java | 2 +- .../epoll/EpollSocketFileRegionTest.java | 2 +- .../epoll/EpollSocketFixedLengthEchoTest.java | 2 +- .../epoll/EpollSocketGatheringWriteTest.java | 2 +- .../epoll/EpollSocketMultipleConnectTest.java | 2 +- .../epoll/EpollSocketObjectEchoTest.java | 2 +- .../channel/epoll/EpollSocketRstTest.java | 2 +- .../EpollSocketSslClientRenegotiateTest.java | 2 +- .../channel/epoll/EpollSocketSslEchoTest.java | 2 +- .../epoll/EpollSocketSslGreetingTest.java | 2 +- .../epoll/EpollSocketSslSessionReuseTest.java | 2 +- .../epoll/EpollSocketStartTlsTest.java | 2 +- .../epoll/EpollSocketStringEchoTest.java | 2 +- .../epoll/EpollSocketTestPermutation.java | 81 +++++++++----- .../epoll/EpollWriteBeforeRegisteredTest.java | 2 +- .../src/main/c/netty_unix_socket.c | 33 ++++-- .../java/io/netty/channel/unix/Socket.java | 101 ++++++++++++----- .../java/io/netty/channel/ChannelOption.java | 3 +- 46 files changed, 426 insertions(+), 208 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 933436f034..d46f031034 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -28,6 +28,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; @@ -780,7 +781,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH forceFlush(ctx); // Explicit start handshake processing once we send the first message. This will also ensure // we will schedule the timeout if needed. - startHandshakeProcessing(); + startHandshakeProcessing(true); return; } @@ -1957,20 +1958,26 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; - pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(ctx.channel(), 16); - if (ctx.channel().isActive()) { - startHandshakeProcessing(); + Channel channel = ctx.channel(); + pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16); + boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT)); + boolean active = channel.isActive(); + if (active || fastOpen) { + // Do not flush the handshake when TCP Fast Open is enabled, unless the channel is active. + // With TCP Fast Open, we write to the outbound buffer before the TCP connect is established. + // The buffer will then be flushed as part of estabilishing the connection, saving us a round-trip. + startHandshakeProcessing(active || !fastOpen); } } - private void startHandshakeProcessing() { + private void startHandshakeProcessing(boolean flushAtEnd) { if (!handshakeStarted) { handshakeStarted = true; if (engine.getUseClientMode()) { // Begin the initial handshake. // channelActive() event has been fired already, which means this.channelActive() will // not be invoked. We have to initialize here instead. - handshake(); + handshake(flushAtEnd); } applyHandshakeTimeout(); } @@ -2022,28 +2029,30 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH oldHandshakePromise.addListener(new PromiseNotifier>(newHandshakePromise)); } else { handshakePromise = newHandshakePromise; - handshake(); + handshake(true); applyHandshakeTimeout(); } } /** * Performs TLS (re)negotiation. + * @param flushAtEnd Set to {@code true} if the outbound buffer should be flushed (written to the network) at the + * end. Set to {@code false} if the handshake will be flushed later, e.g. as part of TCP Fast Open + * connect. */ - private void handshake() { + private void handshake(boolean flushAtEnd) { if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) { // Not all SSLEngine implementations support calling beginHandshake multiple times while a handshake // is in progress. See https://github.com/netty/netty/issues/4718. return; - } else { - if (handshakePromise.isDone()) { - // If the handshake is done already lets just return directly as there is no need to trigger it again. - // This can happen if the handshake(...) was triggered before we called channelActive(...) by a - // flush() that was triggered by a ChannelFutureListener that was added to the ChannelFuture returned - // from the connect(...) method. In this case we will see the flush() happen before we had a chance to - // call fireChannelActive() on the pipeline. - return; - } + } + if (handshakePromise.isDone()) { + // If the handshake is done already lets just return directly as there is no need to trigger it again. + // This can happen if the handshake(...) was triggered before we called channelActive(...) by a + // flush() that was triggered by a ChannelFutureListener that was added to the ChannelFuture returned + // from the connect(...) method. In this case we will see the flush() happen before we had a chance to + // call fireChannelActive() on the pipeline. + return; } // Begin handshake. @@ -2054,7 +2063,9 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH } catch (Throwable e) { setHandshakeFailure(ctx, e); } finally { - forceFlush(ctx); + if (flushAtEnd) { + forceFlush(ctx); + } } } @@ -2105,7 +2116,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { if (!startTls) { - startHandshakeProcessing(); + startHandshakeProcessing(true); } ctx.fireChannelActive(); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java index 2ac6b8aefe..912cda338f 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java @@ -17,19 +17,36 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; +import io.netty.util.internal.StringUtil; +import org.junit.AssumptionViolatedException; import org.junit.Test; +import java.io.ByteArrayOutputStream; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; -import static org.junit.Assert.*; +import static io.netty.buffer.ByteBufUtil.writeAscii; +import static io.netty.buffer.UnpooledByteBufAllocator.DEFAULT; +import static io.netty.util.CharsetUtil.US_ASCII; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class SocketConnectTest extends AbstractSocketTest { @@ -110,8 +127,93 @@ public class SocketConnectTest extends AbstractSocketTest { } } + @Test(timeout = 3000) + public void testWriteWithFastOpenBeforeConnect() throws Throwable { + run(); + } + + public void testWriteWithFastOpenBeforeConnect(ServerBootstrap sb, Bootstrap cb) throws Throwable { + enableTcpFastOpen(sb, cb); + sb.childOption(ChannelOption.AUTO_READ, true); + cb.option(ChannelOption.AUTO_READ, true); + + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new EchoServerHandler()); + } + }); + + Channel sc = sb.bind().sync().channel(); + connectAndVerifyDataTransfer(cb, sc); + connectAndVerifyDataTransfer(cb, sc); + } + + private static void connectAndVerifyDataTransfer(Bootstrap cb, Channel sc) + throws InterruptedException { + BufferingClientHandler handler = new BufferingClientHandler(); + cb.handler(handler); + ChannelFuture register = cb.register(); + Channel channel = register.sync().channel(); + ChannelFuture write = channel.write(writeAscii(DEFAULT, "[fastopen]")); + SocketAddress remoteAddress = sc.localAddress(); + ChannelFuture connectFuture = channel.connect(remoteAddress); + Channel cc = connectFuture.sync().channel(); + cc.writeAndFlush(writeAscii(DEFAULT, "[normal data]")).sync(); + write.sync(); + String expectedString = "[fastopen][normal data]"; + String result = handler.collectBuffer(expectedString.getBytes(US_ASCII).length); + cc.disconnect().sync(); + assertEquals(expectedString, result); + } + + protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) { + throw new AssumptionViolatedException( + "Support for testing TCP_FASTOPEN not enabled for " + StringUtil.simpleClassName(this)); + } + private static void assertLocalAddress(InetSocketAddress address) { assertTrue(address.getPort() > 0); assertFalse(address.getAddress().isAnyLocalAddress()); } + + private static class BufferingClientHandler extends ChannelInboundHandlerAdapter { + private final Semaphore semaphore = new Semaphore(0); + private final ByteArrayOutputStream streamBuffer = new ByteArrayOutputStream(); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + int readableBytes = buf.readableBytes(); + buf.readBytes(streamBuffer, readableBytes); + semaphore.release(readableBytes); + buf.release(); + } else { + throw new IllegalArgumentException("Unexpected message type: " + msg); + } + } + + String collectBuffer(int expectedBytes) throws InterruptedException { + semaphore.acquire(expectedBytes); + byte[] bytes = streamBuffer.toByteArray(); + streamBuffer.reset(); + return new String(bytes, US_ASCII); + } + } + + private static final class EchoServerHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf buffer = ctx.alloc().buffer(); + ByteBuf buf = (ByteBuf) msg; + buffer.writeBytes(buf); + buf.release(); + ctx.channel().writeAndFlush(buffer); + } else { + throw new IllegalArgumentException("Unexpected message type: " + msg); + } + } + } } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketMultipleConnectTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketMultipleConnectTest.java index aa6cbdec44..b8b3976ae3 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketMultipleConnectTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketMultipleConnectTest.java @@ -65,7 +65,7 @@ public class SocketMultipleConnectTest extends AbstractSocketTest { List> factories = new ArrayList>(); for (TestsuitePermutation.BootstrapComboFactory comboFactory - : SocketTestPermutation.INSTANCE.socket()) { + : SocketTestPermutation.INSTANCE.socketWithFastOpen()) { if (comboFactory.newClientInstance().config().group() instanceof NioEventLoopGroup) { factories.add(comboFactory); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java index bfd3c946c6..ed3e3fa17c 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java @@ -40,7 +40,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class SocketReadPendingTest extends AbstractSocketTest { - @Test(timeout = 30000) + @Test(timeout = 60000) public void testReadPendingIsResetAfterEachRead() throws Throwable { run(); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java index 4623dd3e66..cace51b2ec 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java @@ -113,6 +113,22 @@ public class SocketTestPermutation { return list; } + public List> socketWithFastOpen() { + // Make the list of ServerBootstrap factories. + List> sbfs = serverSocket(); + + // Make the list of Bootstrap factories. + List> cbfs = clientSocketWithFastOpen(); + + // Populate the combinations + List> list = combo(sbfs, cbfs); + + // Remove the OIO-OIO case which often leads to a dead lock by its nature. + list.remove(list.size() - 1); + + return list; + } + public List> datagram(final InternetProtocolFamily family) { // Make the list of Bootstrap factories. List> bfs = Arrays.asList( @@ -183,6 +199,10 @@ public class SocketTestPermutation { ); } + public List> clientSocketWithFastOpen() { + return clientSocket(); + } + public List> datagramSocket() { return Arrays.asList( new BootstrapFactory() { diff --git a/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c b/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c index 16453338aa..6f5a245bd4 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c +++ b/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c @@ -124,10 +124,6 @@ static void netty_epoll_linuxsocket_setTcpFastOpen(JNIEnv* env, jclass clazz, ji netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_FASTOPEN, &optval, sizeof(optval)); } -static void netty_epoll_linuxsocket_setTcpFastOpenConnect(JNIEnv* env, jclass clazz, jint fd, jint optval) { - netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_FASTOPEN_CONNECT, &optval, sizeof(optval)); -} - static void netty_epoll_linuxsocket_setTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd, jint optval) { netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)); } @@ -596,20 +592,6 @@ static jint netty_epoll_linuxsocket_isTcpQuickAck(JNIEnv* env, jclass clazz, jin return optval; } -static jint netty_epoll_linuxsocket_isTcpFastOpenConnect(JNIEnv* env, jclass clazz, jint fd) { - int optval; - // We call netty_unix_socket_getOption0 directly so we can handle ENOPROTOOPT by ourself. - if (netty_unix_socket_getOption0(fd, IPPROTO_TCP, TCP_FASTOPEN_CONNECT, &optval, sizeof(optval)) == -1) { - if (errno == ENOPROTOOPT) { - // Not supported by the system, so just return 0. - return 0; - } - netty_unix_socket_getOptionHandleError(env, errno); - return -1; - } - return optval; -} - static jint netty_epoll_linuxsocket_getTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd) { int optval; if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &optval, sizeof(optval)) == -1) { @@ -681,8 +663,6 @@ static const JNINativeMethod fixed_method_table[] = { { "getTcpNotSentLowAt", "(I)I", (void *) netty_epoll_linuxsocket_getTcpNotSentLowAt }, { "isTcpQuickAck", "(I)I", (void *) netty_epoll_linuxsocket_isTcpQuickAck }, { "setTcpFastOpen", "(II)V", (void *) netty_epoll_linuxsocket_setTcpFastOpen }, - { "setTcpFastOpenConnect", "(II)V", (void *) netty_epoll_linuxsocket_setTcpFastOpenConnect }, - { "isTcpFastOpenConnect", "(I)I", (void *) netty_epoll_linuxsocket_isTcpFastOpenConnect }, { "setTcpKeepIdle", "(II)V", (void *) netty_epoll_linuxsocket_setTcpKeepIdle }, { "setTcpKeepIntvl", "(II)V", (void *) netty_epoll_linuxsocket_setTcpKeepIntvl }, { "setTcpKeepCnt", "(II)V", (void *) netty_epoll_linuxsocket_setTcpKeepCnt }, diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 2f39adc89c..3589ac818f 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -31,11 +31,11 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.nio.AbstractNioChannel; import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownReadComplete; import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.unix.FileDescriptor; +import io.netty.channel.unix.IovArray; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; @@ -377,6 +377,43 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann return WRITE_STATUS_SNDBUF_FULL; } + /** + * Write bytes to the socket, with or without a remote address. + * Used for datagram and TCP client fast open writes. + */ + final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boolean fastOpen) + throws IOException { + assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address"; + if (data.hasMemoryAddress()) { + long memoryAddress = data.memoryAddress(); + if (remoteAddress == null) { + return socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex()); + } + return socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(), + remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen); + } + + if (data.nioBufferCount() > 1) { + IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray(); + array.add(data, data.readerIndex(), data.readableBytes()); + int cnt = array.count(); + assert cnt != 0; + + if (remoteAddress == null) { + return socket.writevAddresses(array.memoryAddress(0), cnt); + } + return socket.sendToAddresses(array.memoryAddress(0), cnt, + remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen); + } + + ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes()); + if (remoteAddress == null) { + return socket.write(nioData, nioData.position(), nioData.limit()); + } + return socket.sendTo(nioData, nioData.position(), nioData.limit(), + remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen); + } + protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { boolean readPending; boolean maybeMoreDataToRead; @@ -730,7 +767,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann return connected; } - private boolean doConnect0(SocketAddress remote) throws Exception { + boolean doConnect0(SocketAddress remote) throws Exception { boolean success = false; try { boolean connected = socket.connect(remote); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java index efc6e1149c..7eff9754be 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java @@ -33,8 +33,12 @@ public final class EpollChannelOption extends UnixChannelOption { public static final ChannelOption IP_TRANSPARENT = valueOf("IP_TRANSPARENT"); public static final ChannelOption IP_RECVORIGDSTADDR = valueOf("IP_RECVORIGDSTADDR"); public static final ChannelOption TCP_FASTOPEN = valueOf(EpollChannelOption.class, "TCP_FASTOPEN"); - public static final ChannelOption TCP_FASTOPEN_CONNECT = - valueOf(EpollChannelOption.class, "TCP_FASTOPEN_CONNECT"); + + /** + * @deprecated Use {@link ChannelOption#TCP_FASTOPEN_CONNECT} instead. + */ + @Deprecated + public static final ChannelOption TCP_FASTOPEN_CONNECT = ChannelOption.TCP_FASTOPEN_CONNECT; public static final ChannelOption TCP_DEFER_ACCEPT = ChannelOption.valueOf(EpollChannelOption.class, "TCP_DEFER_ACCEPT"); public static final ChannelOption TCP_QUICKACK = valueOf(EpollChannelOption.class, "TCP_QUICKACK"); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 9ac2c88f23..0736a6fc9d 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -32,7 +32,6 @@ import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.unix.DatagramSocketAddress; import io.netty.channel.unix.Errors; import io.netty.channel.unix.Errors.NativeIoException; -import io.netty.channel.unix.IovArray; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannelUtil; import io.netty.util.ReferenceCountUtil; @@ -350,7 +349,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements private boolean doWriteMessage(Object msg) throws Exception { final ByteBuf data; - InetSocketAddress remoteAddress; + final InetSocketAddress remoteAddress; if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope envelope = @@ -367,38 +366,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements return true; } - final long writtenBytes; - if (data.hasMemoryAddress()) { - long memoryAddress = data.memoryAddress(); - if (remoteAddress == null) { - writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex()); - } else { - writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(), - remoteAddress.getAddress(), remoteAddress.getPort()); - } - } else if (data.nioBufferCount() > 1) { - IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray(); - array.add(data, data.readerIndex(), data.readableBytes()); - int cnt = array.count(); - assert cnt != 0; - - if (remoteAddress == null) { - writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt); - } else { - writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt, - remoteAddress.getAddress(), remoteAddress.getPort()); - } - } else { - ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes()); - if (remoteAddress == null) { - writtenBytes = socket.write(nioData, nioData.position(), nioData.limit()); - } else { - writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(), - remoteAddress.getAddress(), remoteAddress.getPort()); - } - } - - return writtenBytes > 0; + return doWriteOrSendBytes(data, remoteAddress, false) > 0; } @Override diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index ab0efcb819..5e20307a5d 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -15,8 +15,10 @@ */ package io.netty.channel.epoll; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.GlobalEventExecutor; @@ -24,6 +26,7 @@ import io.netty.util.concurrent.GlobalEventExecutor; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -111,6 +114,29 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme return new EpollSocketChannelUnsafe(); } + @Override + boolean doConnect0(SocketAddress remote) throws Exception { + if (Native.IS_SUPPORTING_TCP_FASTOPEN && config.isTcpFastOpenConnect()) { + ChannelOutboundBuffer outbound = unsafe().outboundBuffer(); + outbound.addFlush(); + Object curr; + if ((curr = outbound.current()) instanceof ByteBuf) { + ByteBuf initialData = (ByteBuf) curr; + // If no cookie is present, the write fails with EINPROGRESS and this call basically + // becomes a normal async connect. All writes will be sent normally afterwards. + long localFlushedAmount = doWriteOrSendBytes( + initialData, (InetSocketAddress) remote, true); + if (localFlushedAmount > 0) { + // We had a cookie and our fast-open proceeded. Remove written data + // then continue with normal TCP operation. + outbound.removeBytes(localFlushedAmount); + return true; + } + } + } + return super.doConnect0(remote); + } + private final class EpollSocketChannelUnsafe extends EpollStreamUnsafe { @Override protected Executor prepareToClose() { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java index b330013111..ef341514db 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java @@ -39,6 +39,7 @@ import static io.netty.channel.ChannelOption.TCP_NODELAY; public final class EpollSocketChannelConfig extends EpollChannelConfig implements SocketChannelConfig { private volatile boolean allowHalfClosure; + private volatile boolean tcpFastopen; /** * Creates a new instance. @@ -60,7 +61,7 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement ALLOW_HALF_CLOSURE, EpollChannelOption.TCP_CORK, EpollChannelOption.TCP_NOTSENT_LOWAT, EpollChannelOption.TCP_KEEPCNT, EpollChannelOption.TCP_KEEPIDLE, EpollChannelOption.TCP_KEEPINTVL, EpollChannelOption.TCP_MD5SIG, EpollChannelOption.TCP_QUICKACK, EpollChannelOption.IP_TRANSPARENT, - EpollChannelOption.TCP_FASTOPEN_CONNECT, EpollChannelOption.SO_BUSY_POLL); + ChannelOption.TCP_FASTOPEN_CONNECT, EpollChannelOption.SO_BUSY_POLL); } @SuppressWarnings("unchecked") @@ -114,7 +115,7 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement if (option == EpollChannelOption.IP_TRANSPARENT) { return (T) Boolean.valueOf(isIpTransparent()); } - if (option == EpollChannelOption.TCP_FASTOPEN_CONNECT) { + if (option == ChannelOption.TCP_FASTOPEN_CONNECT) { return (T) Boolean.valueOf(isTcpFastOpenConnect()); } if (option == EpollChannelOption.SO_BUSY_POLL) { @@ -163,7 +164,7 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement setTcpMd5Sig(m); } else if (option == EpollChannelOption.TCP_QUICKACK) { setTcpQuickAck((Boolean) value); - } else if (option == EpollChannelOption.TCP_FASTOPEN_CONNECT) { + } else if (option == ChannelOption.TCP_FASTOPEN_CONNECT) { setTcpFastOpenConnect((Boolean) value); } else if (option == EpollChannelOption.SO_BUSY_POLL) { setSoBusyPoll((Integer) value); @@ -551,29 +552,21 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement } /** - * Set the {@code TCP_FASTOPEN_CONNECT} option on the socket. Requires Linux kernel 4.11 or later. - * See - * this commit - * for more details. + * Enables client TCP fast open. {@code TCP_FASTOPEN_CONNECT} normally + * requires Linux kernel 4.11 or later, so instead we use the traditional fast open + * client socket mechanics that work with kernel 3.6 and later. See this + * LWN article for more info. */ public EpollSocketChannelConfig setTcpFastOpenConnect(boolean fastOpenConnect) { - try { - ((EpollSocketChannel) channel).socket.setTcpFastOpenConnect(fastOpenConnect); - return this; - } catch (IOException e) { - throw new ChannelException(e); - } + tcpFastopen = fastOpenConnect; + return this; } /** - * Returns {@code true} if {@code TCP_FASTOPEN_CONNECT} is enabled, {@code false} otherwise. + * Returns {@code true} if TCP fast open is enabled, {@code false} otherwise. */ public boolean isTcpFastOpenConnect() { - try { - return ((EpollSocketChannel) channel).socket.isTcpFastOpenConnect(); - } catch (IOException e) { - throw new ChannelException(e); - } + return tcpFastopen; } @Override diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java index 6871bcd79e..62283fb5dc 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java @@ -179,14 +179,6 @@ final class LinuxSocket extends Socket { setTcpFastOpen(intValue(), tcpFastopenBacklog); } - void setTcpFastOpenConnect(boolean tcpFastOpenConnect) throws IOException { - setTcpFastOpenConnect(intValue(), tcpFastOpenConnect ? 1 : 0); - } - - boolean isTcpFastOpenConnect() throws IOException { - return isTcpFastOpenConnect(intValue()) != 0; - } - void setTcpKeepIdle(int seconds) throws IOException { setTcpKeepIdle(intValue(), seconds); } @@ -369,7 +361,6 @@ final class LinuxSocket extends Socket { private static native int isIpRecvOrigDestAddr(int fd) throws IOException; private static native void getTcpInfo(int fd, long[] array) throws IOException; private static native PeerCredentials getPeerCredentials(int fd) throws IOException; - private static native int isTcpFastOpenConnect(int fd) throws IOException; private static native void setTcpDeferAccept(int fd, int deferAccept) throws IOException; private static native void setTcpQuickAck(int fd, int quickAck) throws IOException; @@ -377,7 +368,6 @@ final class LinuxSocket extends Socket { private static native void setSoBusyPoll(int fd, int loopMicros) throws IOException; private static native void setTcpNotSentLowAt(int fd, int tcpNotSentLowAt) throws IOException; private static native void setTcpFastOpen(int fd, int tcpFastopenBacklog) throws IOException; - private static native void setTcpFastOpenConnect(int fd, int tcpFastOpenConnect) throws IOException; private static native void setTcpKeepIdle(int fd, int seconds) throws IOException; private static native void setTcpKeepIntvl(int fd, int seconds) throws IOException; private static native void setTcpKeepCnt(int fd, int probes) throws IOException; diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollCompositeBufferGatheringWriteTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollCompositeBufferGatheringWriteTest.java index 4749a438ba..17dd91e408 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollCompositeBufferGatheringWriteTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollCompositeBufferGatheringWriteTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollCompositeBufferGatheringWriteTest extends CompositeBufferGatheringWriteTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java index 9a6b2b7c53..330aa57146 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollETSocketAutoReadTest extends SocketAutoReadTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketConditionalWritabilityTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketConditionalWritabilityTest.java index 7f79770e5e..ffff608d74 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketConditionalWritabilityTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketConditionalWritabilityTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollETSocketConditionalWritabilityTest extends SocketConditionalWritabilityTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketDataReadInitialStateTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketDataReadInitialStateTest.java index 98659f96ed..b246234d58 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketDataReadInitialStateTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketDataReadInitialStateTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollETSocketDataReadInitialStateTest extends SocketDataReadInitialStateTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java index ac3d2f12b9..06beed2b86 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollETSocketExceptionHandlingTest extends SocketExceptionHandlingTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketHalfClosed.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketHalfClosed.java index c2dfebfa07..74ae17a9ff 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketHalfClosed.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketHalfClosed.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollETSocketHalfClosed extends SocketHalfClosedTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java index 6b4e856a81..7fe3ebc6ec 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollETSocketReadPendingTest extends SocketReadPendingTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java index 426159fde7..8f6b794f22 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollLTSocketAutoReadTest extends SocketAutoReadTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketConditionalWritabilityTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketConditionalWritabilityTest.java index fdcf7ebf3a..9bfb25804c 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketConditionalWritabilityTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketConditionalWritabilityTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollLTSocketConditionalWritabilityTest extends SocketConditionalWritabilityTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketDataReadInitialStateTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketDataReadInitialStateTest.java index bca531909c..5d484390c8 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketDataReadInitialStateTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketDataReadInitialStateTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollLTSocketDataReadInitialStateTest extends SocketDataReadInitialStateTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java index 841450ecf2..f11635eabf 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollLTSocketExceptionHandlingTest extends SocketExceptionHandlingTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketHalfClosed.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketHalfClosed.java index f8e7aa6a0f..cbf3dbfeb9 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketHalfClosed.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketHalfClosed.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollLTSocketHalfClosed extends SocketHalfClosedTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java index 03546541dd..46b8b1b77a 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollLTSocketReadPendingTest extends SocketReadPendingTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelNotYetConnectedTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelNotYetConnectedTest.java index ab7e6bf5b7..5cbfaf08df 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelNotYetConnectedTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelNotYetConnectedTest.java @@ -24,6 +24,6 @@ import java.util.List; public class EpollSocketChannelNotYetConnectedTest extends SocketChannelNotYetConnectedTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.clientSocket(); + return EpollSocketTestPermutation.INSTANCE.clientSocketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketCloseForciblyTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketCloseForciblyTest.java index 657a03462c..43b561e95e 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketCloseForciblyTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketCloseForciblyTest.java @@ -25,6 +25,6 @@ import java.util.List; public class EpollSocketCloseForciblyTest extends SocketCloseForciblyTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java index 93742ca23a..6c6b32c790 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java @@ -17,6 +17,7 @@ package io.netty.channel.epoll; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelOption; import io.netty.testsuite.transport.TestsuitePermutation; import io.netty.testsuite.transport.socket.SocketConnectTest; @@ -26,6 +27,12 @@ public class EpollSocketConnectTest extends SocketConnectTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); + } + + @Override + protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) { + sb.childOption(EpollChannelOption.TCP_FASTOPEN, 5); + cb.option(ChannelOption.TCP_FASTOPEN_CONNECT, true); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java index d4a889248f..61e89faa79 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java @@ -26,6 +26,6 @@ public class EpollSocketEchoTest extends SocketEchoTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java index f4c14d0de7..a28ae007bd 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java @@ -26,6 +26,6 @@ public class EpollSocketFileRegionTest extends SocketFileRegionTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java index 7d6ef9faf0..92cb620de3 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java @@ -26,6 +26,6 @@ public class EpollSocketFixedLengthEchoTest extends SocketFixedLengthEchoTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java index 5cea4caf51..7ae8a5aed7 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java @@ -26,6 +26,6 @@ public class EpollSocketGatheringWriteTest extends SocketGatheringWriteTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketMultipleConnectTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketMultipleConnectTest.java index 8e50c72b96..b70313545e 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketMultipleConnectTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketMultipleConnectTest.java @@ -32,7 +32,7 @@ public class EpollSocketMultipleConnectTest extends SocketMultipleConnectTest { List> factories = new ArrayList>(); for (TestsuitePermutation.BootstrapComboFactory comboFactory - : EpollSocketTestPermutation.INSTANCE.socket()) { + : EpollSocketTestPermutation.INSTANCE.socketWithFastOpen()) { EventLoopGroup group = comboFactory.newClientInstance().config().group(); if (group instanceof NioEventLoopGroup || group instanceof EpollEventLoopGroup) { factories.add(comboFactory); diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java index 20141e8538..78e7527fce 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java @@ -26,6 +26,6 @@ public class EpollSocketObjectEchoTest extends SocketObjectEchoTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketRstTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketRstTest.java index 5aef606b59..e26066dd56 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketRstTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketRstTest.java @@ -32,7 +32,7 @@ import static org.junit.Assert.assertTrue; public class EpollSocketRstTest extends SocketRstTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslClientRenegotiateTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslClientRenegotiateTest.java index 98465c8620..26ed3dc4da 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslClientRenegotiateTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslClientRenegotiateTest.java @@ -31,6 +31,6 @@ public class EpollSocketSslClientRenegotiateTest extends SocketSslClientRenegoti @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java index 7b41a2e061..f56a769177 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java @@ -36,6 +36,6 @@ public class EpollSocketSslEchoTest extends SocketSslEchoTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslGreetingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslGreetingTest.java index 3030bc3f01..cd294d1446 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslGreetingTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslGreetingTest.java @@ -31,6 +31,6 @@ public class EpollSocketSslGreetingTest extends SocketSslGreetingTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslSessionReuseTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslSessionReuseTest.java index 1a3f09f7db..35c4d149ec 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslSessionReuseTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslSessionReuseTest.java @@ -31,6 +31,6 @@ public class EpollSocketSslSessionReuseTest extends SocketSslSessionReuseTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java index 0fe27d07e7..00d2f49499 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java @@ -31,6 +31,6 @@ public class EpollSocketStartTlsTest extends SocketStartTlsTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java index 7aa42a7064..b25a40af16 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java @@ -26,6 +26,6 @@ public class EpollSocketStringEchoTest extends SocketStringEchoTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java index c734182e68..66d79da145 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java @@ -19,6 +19,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.nio.NioDatagramChannel; @@ -53,10 +54,21 @@ class EpollSocketTestPermutation extends SocketTestPermutation { new EpollEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-epoll-worker", true)); private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollSocketTestPermutation.class); + // Constants describing if/how TCP Fast Open is allowed to work on Linux: + private static final int TFO_ENABLED_CLIENT = 1; + private static final int TFO_ENABLED_SERVER = 2; @Override public List> socket() { + List> list = + combo(serverSocket(), clientSocketWithFastOpen()); + list.remove(list.size() - 1); // Exclude NIO x NIO test + + return list; + } + + public List> socketWithoutFastOpen() { List> list = combo(serverSocket(), clientSocket()); @@ -76,7 +88,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation { .channel(EpollServerSocketChannel.class); } }); - if (isServerFastOpen()) { + if (isFastOpen()) { toReturn.add(new BootstrapFactory() { @Override public ServerBootstrap newInstance() { @@ -98,30 +110,49 @@ class EpollSocketTestPermutation extends SocketTestPermutation { return toReturn; } - @SuppressWarnings("unchecked") @Override public List> clientSocket() { - return Arrays.asList( - new BootstrapFactory() { - @Override - public Bootstrap newInstance() { - return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class); - } - }, - new BootstrapFactory() { - @Override - public Bootstrap newInstance() { - return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class); - } + List> toReturn = new ArrayList>(); + + toReturn.add(new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class); + } + }); + + toReturn.add(new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class); + } + }); + + return toReturn; + } + + @Override + public List> clientSocketWithFastOpen() { + List> factories = clientSocket(); + + if (isFastOpen()) { + int insertIndex = factories.size() - 1; // Keep NIO fixture last. + factories.add(insertIndex, new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class) + .option(ChannelOption.TCP_FASTOPEN_CONNECT, true); } - ); + }); + } + + return factories; } @Override public List> datagram( final InternetProtocolFamily family) { // Make the list of Bootstrap factories. - @SuppressWarnings("unchecked") List> bfs = Arrays.asList( new BootstrapFactory() { @Override @@ -165,7 +196,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation { Collections.singletonList(datagramBootstrapFactory(family))); } - private BootstrapFactory datagramBootstrapFactory(final InternetProtocolFamily family) { + private static BootstrapFactory datagramBootstrapFactory(final InternetProtocolFamily family) { return new BootstrapFactory() { @Override public Bootstrap newInstance() { @@ -226,8 +257,8 @@ class EpollSocketTestPermutation extends SocketTestPermutation { ); } - public boolean isServerFastOpen() { - return AccessController.doPrivileged(new PrivilegedAction() { + public boolean isFastOpen() { + int tfoEnabled = AccessController.doPrivileged(new PrivilegedAction() { @Override public Integer run() { int fastopen = 0; @@ -237,9 +268,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation { try { in = new BufferedReader(new FileReader(file)); fastopen = Integer.parseInt(in.readLine()); - if (logger.isDebugEnabled()) { - logger.debug("{}: {}", file, fastopen); - } + logger.debug("{}: {}", file, fastopen); } catch (Exception e) { logger.debug("Failed to get TCP_FASTOPEN from: {}", file, e); } finally { @@ -252,13 +281,13 @@ class EpollSocketTestPermutation extends SocketTestPermutation { } } } else { - if (logger.isDebugEnabled()) { - logger.debug("{}: {} (non-existent)", file, fastopen); - } + logger.debug("{}: {} (non-existent)", file, fastopen); } return fastopen; } - }) == 3; + }); + // TCP Fast Open needs to be enabled for both clients and servers, before we can test our intergration with it. + return tfoEnabled == TFO_ENABLED_CLIENT + TFO_ENABLED_SERVER; } public static DomainSocketAddress newSocketAddress() { diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollWriteBeforeRegisteredTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollWriteBeforeRegisteredTest.java index 975ee69531..7e93faf0cd 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollWriteBeforeRegisteredTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollWriteBeforeRegisteredTest.java @@ -25,6 +25,6 @@ public class EpollWriteBeforeRegisteredTest extends WriteBeforeRegisteredTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.clientSocket(); + return EpollSocketTestPermutation.INSTANCE.clientSocketWithFastOpen(); } } diff --git a/transport-native-unix-common/src/main/c/netty_unix_socket.c b/transport-native-unix-common/src/main/c/netty_unix_socket.c index da313eb65f..4243fbd288 100644 --- a/transport-native-unix-common/src/main/c/netty_unix_socket.c +++ b/transport-native-unix-common/src/main/c/netty_unix_socket.c @@ -39,6 +39,11 @@ #define SO_REUSEPORT 15 #endif /* SO_REUSEPORT */ +// MSG_FASTOPEN is defined in linux 3.6. We define this here so older kernels can compile. +#ifndef MSG_FASTOPEN +#define MSG_FASTOPEN 0x20000000 +#endif + static jclass datagramSocketAddressClass = NULL; static jmethodID datagramSocketAddrMethodId = NULL; static jmethodID inetSocketAddrMethodId = NULL; @@ -309,7 +314,7 @@ int netty_unix_socket_initSockaddr(JNIEnv* env, jboolean ipv6, jbyteArray addres return 0; } -static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) { +static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port, jint flags) { struct sockaddr_storage addr; socklen_t addrSize; if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) { @@ -319,7 +324,7 @@ static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos, ssize_t res; int err; do { - res = sendto(fd, buffer + pos, (size_t) (limit - pos), 0, (struct sockaddr*) &addr, addrSize); + res = sendto(fd, buffer + pos, (size_t) (limit - pos), flags, (struct sockaddr*) &addr, addrSize); // keep on writing if it was interrupted } while (res == -1 && ((err = errno) == EINTR)); @@ -623,16 +628,16 @@ static jint netty_unix_socket_newSocketDomainFd(JNIEnv* env, jclass clazz) { return fd; } -static jint netty_unix_socket_sendTo(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) { +static jint netty_unix_socket_sendTo(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port, jint flags) { // We check that GetDirectBufferAddress will not return NULL in OnLoad - return _sendTo(env, fd, ipv6, (*env)->GetDirectBufferAddress(env, jbuffer), pos, limit, address, scopeId, port); + return _sendTo(env, fd, ipv6, (*env)->GetDirectBufferAddress(env, jbuffer), pos, limit, address, scopeId, port, flags); } -static jint netty_unix_socket_sendToAddress(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) { - return _sendTo(env, fd, ipv6, (void *) (intptr_t) memoryAddress, pos, limit, address, scopeId, port); +static jint netty_unix_socket_sendToAddress(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port, jint flags) { + return _sendTo(env, fd, ipv6, (void *) (intptr_t) memoryAddress, pos, limit, address, scopeId, port, flags); } -static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port) { +static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port, jint flags) { struct sockaddr_storage addr; socklen_t addrSize; if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) { @@ -648,7 +653,7 @@ static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd ssize_t res; int err; do { - res = sendmsg(fd, &m, 0); + res = sendmsg(fd, &m, flags); // keep on writing if it was interrupted } while (res == -1 && ((err = errno) == EINTR)); @@ -965,6 +970,9 @@ static jint netty_unix_socket_isBroadcast(JNIEnv* env, jclass clazz, jint fd) { return optval; } +static jint netty_unit_socket_msgFastopen(JNIEnv* env, jclass clazz) { + return MSG_FASTOPEN; +} // JNI Registered Methods End @@ -982,9 +990,9 @@ static const JNINativeMethod fixed_method_table[] = { { "newSocketDgramFd", "(Z)I", (void *) netty_unix_socket_newSocketDgramFd }, { "newSocketStreamFd", "(Z)I", (void *) netty_unix_socket_newSocketStreamFd }, { "newSocketDomainFd", "()I", (void *) netty_unix_socket_newSocketDomainFd }, - { "sendTo", "(IZLjava/nio/ByteBuffer;II[BII)I", (void *) netty_unix_socket_sendTo }, - { "sendToAddress", "(IZJII[BII)I", (void *) netty_unix_socket_sendToAddress }, - { "sendToAddresses", "(IZJI[BII)I", (void *) netty_unix_socket_sendToAddresses }, + { "sendTo", "(IZLjava/nio/ByteBuffer;II[BIII)I", (void *) netty_unix_socket_sendTo }, + { "sendToAddress", "(IZJII[BIII)I", (void *) netty_unix_socket_sendToAddress }, + { "sendToAddresses", "(IZJI[BIII)I", (void *) netty_unix_socket_sendToAddresses }, // "recvFrom" has a dynamic signature // "recvFromAddress" has a dynamic signature { "recvFd", "(I)I", (void *) netty_unix_socket_recvFd }, @@ -1012,7 +1020,8 @@ static const JNINativeMethod fixed_method_table[] = { { "getSoError", "(I)I", (void *) netty_unix_socket_getSoError }, { "initialize", "(Z)V", (void *) netty_unix_socket_initialize }, { "isIPv6Preferred", "()Z", (void *) netty_unix_socket_isIPv6Preferred }, - { "isIPv6", "(I)Z", (void *) netty_unix_socket_isIPv6 } + { "isIPv6", "(I)Z", (void *) netty_unix_socket_isIPv6 }, + { "msgFastopen", "()I", (void *) netty_unit_socket_msgFastopen } }; static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java index 602872891d..2a1ffa65d3 100644 --- a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java @@ -113,29 +113,10 @@ public class Socket extends FileDescriptor { } public final int sendTo(ByteBuffer buf, int pos, int limit, InetAddress addr, int port) throws IOException { - // just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected - // to be called frequently - byte[] address; - int scopeId; - if (addr instanceof Inet6Address) { - address = addr.getAddress(); - scopeId = ((Inet6Address) addr).getScopeId(); - } else { - // convert to ipv4 mapped ipv6 address; - scopeId = 0; - address = ipv4MappedIpv6Address(addr.getAddress()); - } - int res = sendTo(fd, useIpv6(addr), buf, pos, limit, address, scopeId, port); - if (res >= 0) { - return res; - } - if (res == ERROR_ECONNREFUSED_NEGATIVE) { - throw new PortUnreachableException("sendTo failed"); - } - return ioResult("sendTo", res); + return sendTo(buf, pos, limit, addr, port, false); } - public final int sendToAddress(long memoryAddress, int pos, int limit, InetAddress addr, int port) + public final int sendTo(ByteBuffer buf, int pos, int limit, InetAddress addr, int port, boolean fastOpen) throws IOException { // just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected // to be called frequently @@ -149,17 +130,30 @@ public class Socket extends FileDescriptor { scopeId = 0; address = ipv4MappedIpv6Address(addr.getAddress()); } - int res = sendToAddress(fd, useIpv6(addr), memoryAddress, pos, limit, address, scopeId, port); + int flags = fastOpen ? msgFastopen() : 0; + int res = sendTo(fd, useIpv6(addr), buf, pos, limit, address, scopeId, port, flags); if (res >= 0) { return res; } - if (res == ERROR_ECONNREFUSED_NEGATIVE) { - throw new PortUnreachableException("sendToAddress failed"); + if (res == ERRNO_EINPROGRESS_NEGATIVE && fastOpen) { + // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection. + // In this case, our TCP connection will be established normally, but no data was transmitted at this time. + // We'll just transmit the data with normal writes later. + return 0; } - return ioResult("sendToAddress", res); + if (res == ERROR_ECONNREFUSED_NEGATIVE) { + throw new PortUnreachableException("sendTo failed"); + } + return ioResult("sendTo", res); } - public final int sendToAddresses(long memoryAddress, int length, InetAddress addr, int port) throws IOException { + public final int sendToAddress(long memoryAddress, int pos, int limit, InetAddress addr, int port) + throws IOException { + return sendToAddress(memoryAddress, pos, limit, addr, port, false); + } + + public final int sendToAddress(long memoryAddress, int pos, int limit, InetAddress addr, int port, + boolean fastOpen) throws IOException { // just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected // to be called frequently byte[] address; @@ -172,11 +166,52 @@ public class Socket extends FileDescriptor { scopeId = 0; address = ipv4MappedIpv6Address(addr.getAddress()); } - int res = sendToAddresses(fd, useIpv6(addr), memoryAddress, length, address, scopeId, port); + int flags = fastOpen ? msgFastopen() : 0; + int res = sendToAddress(fd, useIpv6(addr), memoryAddress, pos, limit, address, scopeId, port, flags); if (res >= 0) { return res; } + if (res == ERRNO_EINPROGRESS_NEGATIVE && fastOpen) { + // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection. + // In this case, our TCP connection will be established normally, but no data was transmitted at this time. + // We'll just transmit the data with normal writes later. + return 0; + } + if (res == ERROR_ECONNREFUSED_NEGATIVE) { + throw new PortUnreachableException("sendToAddress failed"); + } + return ioResult("sendToAddress", res); + } + public final int sendToAddresses(long memoryAddress, int length, InetAddress addr, int port) throws IOException { + return sendToAddresses(memoryAddress, length, addr, port, false); + } + + public final int sendToAddresses(long memoryAddress, int length, InetAddress addr, int port, boolean fastOpen) + throws IOException { + // just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected + // to be called frequently + byte[] address; + int scopeId; + if (addr instanceof Inet6Address) { + address = addr.getAddress(); + scopeId = ((Inet6Address) addr).getScopeId(); + } else { + // convert to ipv4 mapped ipv6 address; + scopeId = 0; + address = ipv4MappedIpv6Address(addr.getAddress()); + } + int flags = fastOpen ? msgFastopen() : 0; + int res = sendToAddresses(fd, useIpv6(addr), memoryAddress, length, address, scopeId, port, flags); + if (res >= 0) { + return res; + } + if (res == ERRNO_EINPROGRESS_NEGATIVE && fastOpen) { + // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection. + // In this case, our TCP connection will be established normally, but no data was transmitted at this time. + // We'll just transmit the data with normal writes later. + return 0; + } if (res == ERROR_ECONNREFUSED_NEGATIVE) { throw new PortUnreachableException("sendToAddresses failed"); } @@ -467,11 +502,16 @@ public class Socket extends FileDescriptor { private static native byte[] localAddress(int fd); private static native int sendTo( - int fd, boolean ipv6, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port); + int fd, boolean ipv6, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port, + int flags); + private static native int sendToAddress( - int fd, boolean ipv6, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port); + int fd, boolean ipv6, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port, + int flags); + private static native int sendToAddresses( - int fd, boolean ipv6, long memoryAddress, int length, byte[] address, int scopeId, int port); + int fd, boolean ipv6, long memoryAddress, int length, byte[] address, int scopeId, int port, + int flags); private static native DatagramSocketAddress recvFrom( int fd, ByteBuffer buf, int pos, int limit) throws IOException; @@ -479,6 +519,7 @@ public class Socket extends FileDescriptor { int fd, long memoryAddress, int pos, int limit) throws IOException; private static native int recvFd(int fd); private static native int sendFd(int socketFd, int fd); + private static native int msgFastopen(); private static native int newSocketStreamFd(boolean ipv6); private static native int newSocketDgramFd(boolean ipv6); diff --git a/transport/src/main/java/io/netty/channel/ChannelOption.java b/transport/src/main/java/io/netty/channel/ChannelOption.java index 260a68dcb2..d75ed02393 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOption.java +++ b/transport/src/main/java/io/netty/channel/ChannelOption.java @@ -24,7 +24,7 @@ import java.net.InetAddress; import java.net.NetworkInterface; /** - * A {@link ChannelOption} allows to configure a {@link ChannelConfig} in a type-safe + * A {@link ChannelOption} allows to configure a {@link ChannelConfig} in a type-safe * way. Which {@link ChannelOption} is supported depends on the actual implementation * of {@link ChannelConfig} and may depend on the nature of the transport it belongs * to. @@ -125,6 +125,7 @@ public class ChannelOption extends AbstractConstant> { public static final ChannelOption IP_MULTICAST_LOOP_DISABLED = valueOf("IP_MULTICAST_LOOP_DISABLED"); public static final ChannelOption TCP_NODELAY = valueOf("TCP_NODELAY"); + public static final ChannelOption TCP_FASTOPEN_CONNECT = valueOf("TCP_FASTOPEN_CONNECT"); @Deprecated public static final ChannelOption DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION =