diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 933436f034..d46f031034 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -28,6 +28,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; @@ -780,7 +781,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH forceFlush(ctx); // Explicit start handshake processing once we send the first message. This will also ensure // we will schedule the timeout if needed. - startHandshakeProcessing(); + startHandshakeProcessing(true); return; } @@ -1957,20 +1958,26 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; - pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(ctx.channel(), 16); - if (ctx.channel().isActive()) { - startHandshakeProcessing(); + Channel channel = ctx.channel(); + pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16); + boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT)); + boolean active = channel.isActive(); + if (active || fastOpen) { + // Do not flush the handshake when TCP Fast Open is enabled, unless the channel is active. + // With TCP Fast Open, we write to the outbound buffer before the TCP connect is established. + // The buffer will then be flushed as part of estabilishing the connection, saving us a round-trip. + startHandshakeProcessing(active || !fastOpen); } } - private void startHandshakeProcessing() { + private void startHandshakeProcessing(boolean flushAtEnd) { if (!handshakeStarted) { handshakeStarted = true; if (engine.getUseClientMode()) { // Begin the initial handshake. // channelActive() event has been fired already, which means this.channelActive() will // not be invoked. We have to initialize here instead. - handshake(); + handshake(flushAtEnd); } applyHandshakeTimeout(); } @@ -2022,28 +2029,30 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH oldHandshakePromise.addListener(new PromiseNotifier>(newHandshakePromise)); } else { handshakePromise = newHandshakePromise; - handshake(); + handshake(true); applyHandshakeTimeout(); } } /** * Performs TLS (re)negotiation. + * @param flushAtEnd Set to {@code true} if the outbound buffer should be flushed (written to the network) at the + * end. Set to {@code false} if the handshake will be flushed later, e.g. as part of TCP Fast Open + * connect. */ - private void handshake() { + private void handshake(boolean flushAtEnd) { if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) { // Not all SSLEngine implementations support calling beginHandshake multiple times while a handshake // is in progress. See https://github.com/netty/netty/issues/4718. return; - } else { - if (handshakePromise.isDone()) { - // If the handshake is done already lets just return directly as there is no need to trigger it again. - // This can happen if the handshake(...) was triggered before we called channelActive(...) by a - // flush() that was triggered by a ChannelFutureListener that was added to the ChannelFuture returned - // from the connect(...) method. In this case we will see the flush() happen before we had a chance to - // call fireChannelActive() on the pipeline. - return; - } + } + if (handshakePromise.isDone()) { + // If the handshake is done already lets just return directly as there is no need to trigger it again. + // This can happen if the handshake(...) was triggered before we called channelActive(...) by a + // flush() that was triggered by a ChannelFutureListener that was added to the ChannelFuture returned + // from the connect(...) method. In this case we will see the flush() happen before we had a chance to + // call fireChannelActive() on the pipeline. + return; } // Begin handshake. @@ -2054,7 +2063,9 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH } catch (Throwable e) { setHandshakeFailure(ctx, e); } finally { - forceFlush(ctx); + if (flushAtEnd) { + forceFlush(ctx); + } } } @@ -2105,7 +2116,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { if (!startTls) { - startHandshakeProcessing(); + startHandshakeProcessing(true); } ctx.fireChannelActive(); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java index 2ac6b8aefe..912cda338f 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java @@ -17,19 +17,36 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; +import io.netty.util.internal.StringUtil; +import org.junit.AssumptionViolatedException; import org.junit.Test; +import java.io.ByteArrayOutputStream; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; -import static org.junit.Assert.*; +import static io.netty.buffer.ByteBufUtil.writeAscii; +import static io.netty.buffer.UnpooledByteBufAllocator.DEFAULT; +import static io.netty.util.CharsetUtil.US_ASCII; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class SocketConnectTest extends AbstractSocketTest { @@ -110,8 +127,93 @@ public class SocketConnectTest extends AbstractSocketTest { } } + @Test(timeout = 3000) + public void testWriteWithFastOpenBeforeConnect() throws Throwable { + run(); + } + + public void testWriteWithFastOpenBeforeConnect(ServerBootstrap sb, Bootstrap cb) throws Throwable { + enableTcpFastOpen(sb, cb); + sb.childOption(ChannelOption.AUTO_READ, true); + cb.option(ChannelOption.AUTO_READ, true); + + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new EchoServerHandler()); + } + }); + + Channel sc = sb.bind().sync().channel(); + connectAndVerifyDataTransfer(cb, sc); + connectAndVerifyDataTransfer(cb, sc); + } + + private static void connectAndVerifyDataTransfer(Bootstrap cb, Channel sc) + throws InterruptedException { + BufferingClientHandler handler = new BufferingClientHandler(); + cb.handler(handler); + ChannelFuture register = cb.register(); + Channel channel = register.sync().channel(); + ChannelFuture write = channel.write(writeAscii(DEFAULT, "[fastopen]")); + SocketAddress remoteAddress = sc.localAddress(); + ChannelFuture connectFuture = channel.connect(remoteAddress); + Channel cc = connectFuture.sync().channel(); + cc.writeAndFlush(writeAscii(DEFAULT, "[normal data]")).sync(); + write.sync(); + String expectedString = "[fastopen][normal data]"; + String result = handler.collectBuffer(expectedString.getBytes(US_ASCII).length); + cc.disconnect().sync(); + assertEquals(expectedString, result); + } + + protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) { + throw new AssumptionViolatedException( + "Support for testing TCP_FASTOPEN not enabled for " + StringUtil.simpleClassName(this)); + } + private static void assertLocalAddress(InetSocketAddress address) { assertTrue(address.getPort() > 0); assertFalse(address.getAddress().isAnyLocalAddress()); } + + private static class BufferingClientHandler extends ChannelInboundHandlerAdapter { + private final Semaphore semaphore = new Semaphore(0); + private final ByteArrayOutputStream streamBuffer = new ByteArrayOutputStream(); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + int readableBytes = buf.readableBytes(); + buf.readBytes(streamBuffer, readableBytes); + semaphore.release(readableBytes); + buf.release(); + } else { + throw new IllegalArgumentException("Unexpected message type: " + msg); + } + } + + String collectBuffer(int expectedBytes) throws InterruptedException { + semaphore.acquire(expectedBytes); + byte[] bytes = streamBuffer.toByteArray(); + streamBuffer.reset(); + return new String(bytes, US_ASCII); + } + } + + private static final class EchoServerHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf buffer = ctx.alloc().buffer(); + ByteBuf buf = (ByteBuf) msg; + buffer.writeBytes(buf); + buf.release(); + ctx.channel().writeAndFlush(buffer); + } else { + throw new IllegalArgumentException("Unexpected message type: " + msg); + } + } + } } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketMultipleConnectTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketMultipleConnectTest.java index aa6cbdec44..b8b3976ae3 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketMultipleConnectTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketMultipleConnectTest.java @@ -65,7 +65,7 @@ public class SocketMultipleConnectTest extends AbstractSocketTest { List> factories = new ArrayList>(); for (TestsuitePermutation.BootstrapComboFactory comboFactory - : SocketTestPermutation.INSTANCE.socket()) { + : SocketTestPermutation.INSTANCE.socketWithFastOpen()) { if (comboFactory.newClientInstance().config().group() instanceof NioEventLoopGroup) { factories.add(comboFactory); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java index bfd3c946c6..ed3e3fa17c 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java @@ -40,7 +40,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class SocketReadPendingTest extends AbstractSocketTest { - @Test(timeout = 30000) + @Test(timeout = 60000) public void testReadPendingIsResetAfterEachRead() throws Throwable { run(); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java index 4623dd3e66..cace51b2ec 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java @@ -113,6 +113,22 @@ public class SocketTestPermutation { return list; } + public List> socketWithFastOpen() { + // Make the list of ServerBootstrap factories. + List> sbfs = serverSocket(); + + // Make the list of Bootstrap factories. + List> cbfs = clientSocketWithFastOpen(); + + // Populate the combinations + List> list = combo(sbfs, cbfs); + + // Remove the OIO-OIO case which often leads to a dead lock by its nature. + list.remove(list.size() - 1); + + return list; + } + public List> datagram(final InternetProtocolFamily family) { // Make the list of Bootstrap factories. List> bfs = Arrays.asList( @@ -183,6 +199,10 @@ public class SocketTestPermutation { ); } + public List> clientSocketWithFastOpen() { + return clientSocket(); + } + public List> datagramSocket() { return Arrays.asList( new BootstrapFactory() { diff --git a/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c b/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c index 16453338aa..6f5a245bd4 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c +++ b/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c @@ -124,10 +124,6 @@ static void netty_epoll_linuxsocket_setTcpFastOpen(JNIEnv* env, jclass clazz, ji netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_FASTOPEN, &optval, sizeof(optval)); } -static void netty_epoll_linuxsocket_setTcpFastOpenConnect(JNIEnv* env, jclass clazz, jint fd, jint optval) { - netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_FASTOPEN_CONNECT, &optval, sizeof(optval)); -} - static void netty_epoll_linuxsocket_setTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd, jint optval) { netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)); } @@ -596,20 +592,6 @@ static jint netty_epoll_linuxsocket_isTcpQuickAck(JNIEnv* env, jclass clazz, jin return optval; } -static jint netty_epoll_linuxsocket_isTcpFastOpenConnect(JNIEnv* env, jclass clazz, jint fd) { - int optval; - // We call netty_unix_socket_getOption0 directly so we can handle ENOPROTOOPT by ourself. - if (netty_unix_socket_getOption0(fd, IPPROTO_TCP, TCP_FASTOPEN_CONNECT, &optval, sizeof(optval)) == -1) { - if (errno == ENOPROTOOPT) { - // Not supported by the system, so just return 0. - return 0; - } - netty_unix_socket_getOptionHandleError(env, errno); - return -1; - } - return optval; -} - static jint netty_epoll_linuxsocket_getTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd) { int optval; if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &optval, sizeof(optval)) == -1) { @@ -681,8 +663,6 @@ static const JNINativeMethod fixed_method_table[] = { { "getTcpNotSentLowAt", "(I)I", (void *) netty_epoll_linuxsocket_getTcpNotSentLowAt }, { "isTcpQuickAck", "(I)I", (void *) netty_epoll_linuxsocket_isTcpQuickAck }, { "setTcpFastOpen", "(II)V", (void *) netty_epoll_linuxsocket_setTcpFastOpen }, - { "setTcpFastOpenConnect", "(II)V", (void *) netty_epoll_linuxsocket_setTcpFastOpenConnect }, - { "isTcpFastOpenConnect", "(I)I", (void *) netty_epoll_linuxsocket_isTcpFastOpenConnect }, { "setTcpKeepIdle", "(II)V", (void *) netty_epoll_linuxsocket_setTcpKeepIdle }, { "setTcpKeepIntvl", "(II)V", (void *) netty_epoll_linuxsocket_setTcpKeepIntvl }, { "setTcpKeepCnt", "(II)V", (void *) netty_epoll_linuxsocket_setTcpKeepCnt }, diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 2f39adc89c..3589ac818f 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -31,11 +31,11 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.nio.AbstractNioChannel; import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownReadComplete; import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.unix.FileDescriptor; +import io.netty.channel.unix.IovArray; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; @@ -377,6 +377,43 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann return WRITE_STATUS_SNDBUF_FULL; } + /** + * Write bytes to the socket, with or without a remote address. + * Used for datagram and TCP client fast open writes. + */ + final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boolean fastOpen) + throws IOException { + assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address"; + if (data.hasMemoryAddress()) { + long memoryAddress = data.memoryAddress(); + if (remoteAddress == null) { + return socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex()); + } + return socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(), + remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen); + } + + if (data.nioBufferCount() > 1) { + IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray(); + array.add(data, data.readerIndex(), data.readableBytes()); + int cnt = array.count(); + assert cnt != 0; + + if (remoteAddress == null) { + return socket.writevAddresses(array.memoryAddress(0), cnt); + } + return socket.sendToAddresses(array.memoryAddress(0), cnt, + remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen); + } + + ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes()); + if (remoteAddress == null) { + return socket.write(nioData, nioData.position(), nioData.limit()); + } + return socket.sendTo(nioData, nioData.position(), nioData.limit(), + remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen); + } + protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { boolean readPending; boolean maybeMoreDataToRead; @@ -730,7 +767,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann return connected; } - private boolean doConnect0(SocketAddress remote) throws Exception { + boolean doConnect0(SocketAddress remote) throws Exception { boolean success = false; try { boolean connected = socket.connect(remote); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java index efc6e1149c..7eff9754be 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java @@ -33,8 +33,12 @@ public final class EpollChannelOption extends UnixChannelOption { public static final ChannelOption IP_TRANSPARENT = valueOf("IP_TRANSPARENT"); public static final ChannelOption IP_RECVORIGDSTADDR = valueOf("IP_RECVORIGDSTADDR"); public static final ChannelOption TCP_FASTOPEN = valueOf(EpollChannelOption.class, "TCP_FASTOPEN"); - public static final ChannelOption TCP_FASTOPEN_CONNECT = - valueOf(EpollChannelOption.class, "TCP_FASTOPEN_CONNECT"); + + /** + * @deprecated Use {@link ChannelOption#TCP_FASTOPEN_CONNECT} instead. + */ + @Deprecated + public static final ChannelOption TCP_FASTOPEN_CONNECT = ChannelOption.TCP_FASTOPEN_CONNECT; public static final ChannelOption TCP_DEFER_ACCEPT = ChannelOption.valueOf(EpollChannelOption.class, "TCP_DEFER_ACCEPT"); public static final ChannelOption TCP_QUICKACK = valueOf(EpollChannelOption.class, "TCP_QUICKACK"); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 9ac2c88f23..0736a6fc9d 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -32,7 +32,6 @@ import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.unix.DatagramSocketAddress; import io.netty.channel.unix.Errors; import io.netty.channel.unix.Errors.NativeIoException; -import io.netty.channel.unix.IovArray; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannelUtil; import io.netty.util.ReferenceCountUtil; @@ -350,7 +349,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements private boolean doWriteMessage(Object msg) throws Exception { final ByteBuf data; - InetSocketAddress remoteAddress; + final InetSocketAddress remoteAddress; if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope envelope = @@ -367,38 +366,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements return true; } - final long writtenBytes; - if (data.hasMemoryAddress()) { - long memoryAddress = data.memoryAddress(); - if (remoteAddress == null) { - writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex()); - } else { - writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(), - remoteAddress.getAddress(), remoteAddress.getPort()); - } - } else if (data.nioBufferCount() > 1) { - IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray(); - array.add(data, data.readerIndex(), data.readableBytes()); - int cnt = array.count(); - assert cnt != 0; - - if (remoteAddress == null) { - writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt); - } else { - writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt, - remoteAddress.getAddress(), remoteAddress.getPort()); - } - } else { - ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes()); - if (remoteAddress == null) { - writtenBytes = socket.write(nioData, nioData.position(), nioData.limit()); - } else { - writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(), - remoteAddress.getAddress(), remoteAddress.getPort()); - } - } - - return writtenBytes > 0; + return doWriteOrSendBytes(data, remoteAddress, false) > 0; } @Override diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index ab0efcb819..5e20307a5d 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -15,8 +15,10 @@ */ package io.netty.channel.epoll; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.GlobalEventExecutor; @@ -24,6 +26,7 @@ import io.netty.util.concurrent.GlobalEventExecutor; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -111,6 +114,29 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme return new EpollSocketChannelUnsafe(); } + @Override + boolean doConnect0(SocketAddress remote) throws Exception { + if (Native.IS_SUPPORTING_TCP_FASTOPEN && config.isTcpFastOpenConnect()) { + ChannelOutboundBuffer outbound = unsafe().outboundBuffer(); + outbound.addFlush(); + Object curr; + if ((curr = outbound.current()) instanceof ByteBuf) { + ByteBuf initialData = (ByteBuf) curr; + // If no cookie is present, the write fails with EINPROGRESS and this call basically + // becomes a normal async connect. All writes will be sent normally afterwards. + long localFlushedAmount = doWriteOrSendBytes( + initialData, (InetSocketAddress) remote, true); + if (localFlushedAmount > 0) { + // We had a cookie and our fast-open proceeded. Remove written data + // then continue with normal TCP operation. + outbound.removeBytes(localFlushedAmount); + return true; + } + } + } + return super.doConnect0(remote); + } + private final class EpollSocketChannelUnsafe extends EpollStreamUnsafe { @Override protected Executor prepareToClose() { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java index b330013111..ef341514db 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java @@ -39,6 +39,7 @@ import static io.netty.channel.ChannelOption.TCP_NODELAY; public final class EpollSocketChannelConfig extends EpollChannelConfig implements SocketChannelConfig { private volatile boolean allowHalfClosure; + private volatile boolean tcpFastopen; /** * Creates a new instance. @@ -60,7 +61,7 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement ALLOW_HALF_CLOSURE, EpollChannelOption.TCP_CORK, EpollChannelOption.TCP_NOTSENT_LOWAT, EpollChannelOption.TCP_KEEPCNT, EpollChannelOption.TCP_KEEPIDLE, EpollChannelOption.TCP_KEEPINTVL, EpollChannelOption.TCP_MD5SIG, EpollChannelOption.TCP_QUICKACK, EpollChannelOption.IP_TRANSPARENT, - EpollChannelOption.TCP_FASTOPEN_CONNECT, EpollChannelOption.SO_BUSY_POLL); + ChannelOption.TCP_FASTOPEN_CONNECT, EpollChannelOption.SO_BUSY_POLL); } @SuppressWarnings("unchecked") @@ -114,7 +115,7 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement if (option == EpollChannelOption.IP_TRANSPARENT) { return (T) Boolean.valueOf(isIpTransparent()); } - if (option == EpollChannelOption.TCP_FASTOPEN_CONNECT) { + if (option == ChannelOption.TCP_FASTOPEN_CONNECT) { return (T) Boolean.valueOf(isTcpFastOpenConnect()); } if (option == EpollChannelOption.SO_BUSY_POLL) { @@ -163,7 +164,7 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement setTcpMd5Sig(m); } else if (option == EpollChannelOption.TCP_QUICKACK) { setTcpQuickAck((Boolean) value); - } else if (option == EpollChannelOption.TCP_FASTOPEN_CONNECT) { + } else if (option == ChannelOption.TCP_FASTOPEN_CONNECT) { setTcpFastOpenConnect((Boolean) value); } else if (option == EpollChannelOption.SO_BUSY_POLL) { setSoBusyPoll((Integer) value); @@ -551,29 +552,21 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement } /** - * Set the {@code TCP_FASTOPEN_CONNECT} option on the socket. Requires Linux kernel 4.11 or later. - * See - * this commit - * for more details. + * Enables client TCP fast open. {@code TCP_FASTOPEN_CONNECT} normally + * requires Linux kernel 4.11 or later, so instead we use the traditional fast open + * client socket mechanics that work with kernel 3.6 and later. See this + * LWN article for more info. */ public EpollSocketChannelConfig setTcpFastOpenConnect(boolean fastOpenConnect) { - try { - ((EpollSocketChannel) channel).socket.setTcpFastOpenConnect(fastOpenConnect); - return this; - } catch (IOException e) { - throw new ChannelException(e); - } + tcpFastopen = fastOpenConnect; + return this; } /** - * Returns {@code true} if {@code TCP_FASTOPEN_CONNECT} is enabled, {@code false} otherwise. + * Returns {@code true} if TCP fast open is enabled, {@code false} otherwise. */ public boolean isTcpFastOpenConnect() { - try { - return ((EpollSocketChannel) channel).socket.isTcpFastOpenConnect(); - } catch (IOException e) { - throw new ChannelException(e); - } + return tcpFastopen; } @Override diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java index 6871bcd79e..62283fb5dc 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java @@ -179,14 +179,6 @@ final class LinuxSocket extends Socket { setTcpFastOpen(intValue(), tcpFastopenBacklog); } - void setTcpFastOpenConnect(boolean tcpFastOpenConnect) throws IOException { - setTcpFastOpenConnect(intValue(), tcpFastOpenConnect ? 1 : 0); - } - - boolean isTcpFastOpenConnect() throws IOException { - return isTcpFastOpenConnect(intValue()) != 0; - } - void setTcpKeepIdle(int seconds) throws IOException { setTcpKeepIdle(intValue(), seconds); } @@ -369,7 +361,6 @@ final class LinuxSocket extends Socket { private static native int isIpRecvOrigDestAddr(int fd) throws IOException; private static native void getTcpInfo(int fd, long[] array) throws IOException; private static native PeerCredentials getPeerCredentials(int fd) throws IOException; - private static native int isTcpFastOpenConnect(int fd) throws IOException; private static native void setTcpDeferAccept(int fd, int deferAccept) throws IOException; private static native void setTcpQuickAck(int fd, int quickAck) throws IOException; @@ -377,7 +368,6 @@ final class LinuxSocket extends Socket { private static native void setSoBusyPoll(int fd, int loopMicros) throws IOException; private static native void setTcpNotSentLowAt(int fd, int tcpNotSentLowAt) throws IOException; private static native void setTcpFastOpen(int fd, int tcpFastopenBacklog) throws IOException; - private static native void setTcpFastOpenConnect(int fd, int tcpFastOpenConnect) throws IOException; private static native void setTcpKeepIdle(int fd, int seconds) throws IOException; private static native void setTcpKeepIntvl(int fd, int seconds) throws IOException; private static native void setTcpKeepCnt(int fd, int probes) throws IOException; diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollCompositeBufferGatheringWriteTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollCompositeBufferGatheringWriteTest.java index 4749a438ba..17dd91e408 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollCompositeBufferGatheringWriteTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollCompositeBufferGatheringWriteTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollCompositeBufferGatheringWriteTest extends CompositeBufferGatheringWriteTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java index 9a6b2b7c53..330aa57146 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollETSocketAutoReadTest extends SocketAutoReadTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketConditionalWritabilityTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketConditionalWritabilityTest.java index 7f79770e5e..ffff608d74 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketConditionalWritabilityTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketConditionalWritabilityTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollETSocketConditionalWritabilityTest extends SocketConditionalWritabilityTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketDataReadInitialStateTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketDataReadInitialStateTest.java index 98659f96ed..b246234d58 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketDataReadInitialStateTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketDataReadInitialStateTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollETSocketDataReadInitialStateTest extends SocketDataReadInitialStateTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java index ac3d2f12b9..06beed2b86 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollETSocketExceptionHandlingTest extends SocketExceptionHandlingTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketHalfClosed.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketHalfClosed.java index c2dfebfa07..74ae17a9ff 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketHalfClosed.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketHalfClosed.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollETSocketHalfClosed extends SocketHalfClosedTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java index 6b4e856a81..7fe3ebc6ec 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollETSocketReadPendingTest extends SocketReadPendingTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java index 426159fde7..8f6b794f22 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollLTSocketAutoReadTest extends SocketAutoReadTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketConditionalWritabilityTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketConditionalWritabilityTest.java index fdcf7ebf3a..9bfb25804c 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketConditionalWritabilityTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketConditionalWritabilityTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollLTSocketConditionalWritabilityTest extends SocketConditionalWritabilityTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketDataReadInitialStateTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketDataReadInitialStateTest.java index bca531909c..5d484390c8 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketDataReadInitialStateTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketDataReadInitialStateTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollLTSocketDataReadInitialStateTest extends SocketDataReadInitialStateTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java index 841450ecf2..f11635eabf 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollLTSocketExceptionHandlingTest extends SocketExceptionHandlingTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketHalfClosed.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketHalfClosed.java index f8e7aa6a0f..cbf3dbfeb9 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketHalfClosed.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketHalfClosed.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollLTSocketHalfClosed extends SocketHalfClosedTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java index 03546541dd..46b8b1b77a 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java @@ -26,7 +26,7 @@ import java.util.List; public class EpollLTSocketReadPendingTest extends SocketReadPendingTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelNotYetConnectedTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelNotYetConnectedTest.java index ab7e6bf5b7..5cbfaf08df 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelNotYetConnectedTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelNotYetConnectedTest.java @@ -24,6 +24,6 @@ import java.util.List; public class EpollSocketChannelNotYetConnectedTest extends SocketChannelNotYetConnectedTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.clientSocket(); + return EpollSocketTestPermutation.INSTANCE.clientSocketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketCloseForciblyTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketCloseForciblyTest.java index 657a03462c..43b561e95e 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketCloseForciblyTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketCloseForciblyTest.java @@ -25,6 +25,6 @@ import java.util.List; public class EpollSocketCloseForciblyTest extends SocketCloseForciblyTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java index 93742ca23a..6c6b32c790 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java @@ -17,6 +17,7 @@ package io.netty.channel.epoll; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelOption; import io.netty.testsuite.transport.TestsuitePermutation; import io.netty.testsuite.transport.socket.SocketConnectTest; @@ -26,6 +27,12 @@ public class EpollSocketConnectTest extends SocketConnectTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); + } + + @Override + protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) { + sb.childOption(EpollChannelOption.TCP_FASTOPEN, 5); + cb.option(ChannelOption.TCP_FASTOPEN_CONNECT, true); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java index d4a889248f..61e89faa79 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java @@ -26,6 +26,6 @@ public class EpollSocketEchoTest extends SocketEchoTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java index f4c14d0de7..a28ae007bd 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java @@ -26,6 +26,6 @@ public class EpollSocketFileRegionTest extends SocketFileRegionTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java index 7d6ef9faf0..92cb620de3 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java @@ -26,6 +26,6 @@ public class EpollSocketFixedLengthEchoTest extends SocketFixedLengthEchoTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java index 5cea4caf51..7ae8a5aed7 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java @@ -26,6 +26,6 @@ public class EpollSocketGatheringWriteTest extends SocketGatheringWriteTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketMultipleConnectTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketMultipleConnectTest.java index 8e50c72b96..b70313545e 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketMultipleConnectTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketMultipleConnectTest.java @@ -32,7 +32,7 @@ public class EpollSocketMultipleConnectTest extends SocketMultipleConnectTest { List> factories = new ArrayList>(); for (TestsuitePermutation.BootstrapComboFactory comboFactory - : EpollSocketTestPermutation.INSTANCE.socket()) { + : EpollSocketTestPermutation.INSTANCE.socketWithFastOpen()) { EventLoopGroup group = comboFactory.newClientInstance().config().group(); if (group instanceof NioEventLoopGroup || group instanceof EpollEventLoopGroup) { factories.add(comboFactory); diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java index 20141e8538..78e7527fce 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java @@ -26,6 +26,6 @@ public class EpollSocketObjectEchoTest extends SocketObjectEchoTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketRstTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketRstTest.java index 5aef606b59..e26066dd56 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketRstTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketRstTest.java @@ -32,7 +32,7 @@ import static org.junit.Assert.assertTrue; public class EpollSocketRstTest extends SocketRstTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslClientRenegotiateTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslClientRenegotiateTest.java index 98465c8620..26ed3dc4da 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslClientRenegotiateTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslClientRenegotiateTest.java @@ -31,6 +31,6 @@ public class EpollSocketSslClientRenegotiateTest extends SocketSslClientRenegoti @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java index 7b41a2e061..f56a769177 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java @@ -36,6 +36,6 @@ public class EpollSocketSslEchoTest extends SocketSslEchoTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslGreetingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslGreetingTest.java index 3030bc3f01..cd294d1446 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslGreetingTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslGreetingTest.java @@ -31,6 +31,6 @@ public class EpollSocketSslGreetingTest extends SocketSslGreetingTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslSessionReuseTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslSessionReuseTest.java index 1a3f09f7db..35c4d149ec 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslSessionReuseTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslSessionReuseTest.java @@ -31,6 +31,6 @@ public class EpollSocketSslSessionReuseTest extends SocketSslSessionReuseTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java index 0fe27d07e7..00d2f49499 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java @@ -31,6 +31,6 @@ public class EpollSocketStartTlsTest extends SocketStartTlsTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java index 7aa42a7064..b25a40af16 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java @@ -26,6 +26,6 @@ public class EpollSocketStringEchoTest extends SocketStringEchoTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.socket(); + return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java index c734182e68..66d79da145 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java @@ -19,6 +19,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.nio.NioDatagramChannel; @@ -53,10 +54,21 @@ class EpollSocketTestPermutation extends SocketTestPermutation { new EpollEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-epoll-worker", true)); private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollSocketTestPermutation.class); + // Constants describing if/how TCP Fast Open is allowed to work on Linux: + private static final int TFO_ENABLED_CLIENT = 1; + private static final int TFO_ENABLED_SERVER = 2; @Override public List> socket() { + List> list = + combo(serverSocket(), clientSocketWithFastOpen()); + list.remove(list.size() - 1); // Exclude NIO x NIO test + + return list; + } + + public List> socketWithoutFastOpen() { List> list = combo(serverSocket(), clientSocket()); @@ -76,7 +88,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation { .channel(EpollServerSocketChannel.class); } }); - if (isServerFastOpen()) { + if (isFastOpen()) { toReturn.add(new BootstrapFactory() { @Override public ServerBootstrap newInstance() { @@ -98,30 +110,49 @@ class EpollSocketTestPermutation extends SocketTestPermutation { return toReturn; } - @SuppressWarnings("unchecked") @Override public List> clientSocket() { - return Arrays.asList( - new BootstrapFactory() { - @Override - public Bootstrap newInstance() { - return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class); - } - }, - new BootstrapFactory() { - @Override - public Bootstrap newInstance() { - return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class); - } + List> toReturn = new ArrayList>(); + + toReturn.add(new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class); + } + }); + + toReturn.add(new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class); + } + }); + + return toReturn; + } + + @Override + public List> clientSocketWithFastOpen() { + List> factories = clientSocket(); + + if (isFastOpen()) { + int insertIndex = factories.size() - 1; // Keep NIO fixture last. + factories.add(insertIndex, new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class) + .option(ChannelOption.TCP_FASTOPEN_CONNECT, true); } - ); + }); + } + + return factories; } @Override public List> datagram( final InternetProtocolFamily family) { // Make the list of Bootstrap factories. - @SuppressWarnings("unchecked") List> bfs = Arrays.asList( new BootstrapFactory() { @Override @@ -165,7 +196,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation { Collections.singletonList(datagramBootstrapFactory(family))); } - private BootstrapFactory datagramBootstrapFactory(final InternetProtocolFamily family) { + private static BootstrapFactory datagramBootstrapFactory(final InternetProtocolFamily family) { return new BootstrapFactory() { @Override public Bootstrap newInstance() { @@ -226,8 +257,8 @@ class EpollSocketTestPermutation extends SocketTestPermutation { ); } - public boolean isServerFastOpen() { - return AccessController.doPrivileged(new PrivilegedAction() { + public boolean isFastOpen() { + int tfoEnabled = AccessController.doPrivileged(new PrivilegedAction() { @Override public Integer run() { int fastopen = 0; @@ -237,9 +268,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation { try { in = new BufferedReader(new FileReader(file)); fastopen = Integer.parseInt(in.readLine()); - if (logger.isDebugEnabled()) { - logger.debug("{}: {}", file, fastopen); - } + logger.debug("{}: {}", file, fastopen); } catch (Exception e) { logger.debug("Failed to get TCP_FASTOPEN from: {}", file, e); } finally { @@ -252,13 +281,13 @@ class EpollSocketTestPermutation extends SocketTestPermutation { } } } else { - if (logger.isDebugEnabled()) { - logger.debug("{}: {} (non-existent)", file, fastopen); - } + logger.debug("{}: {} (non-existent)", file, fastopen); } return fastopen; } - }) == 3; + }); + // TCP Fast Open needs to be enabled for both clients and servers, before we can test our intergration with it. + return tfoEnabled == TFO_ENABLED_CLIENT + TFO_ENABLED_SERVER; } public static DomainSocketAddress newSocketAddress() { diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollWriteBeforeRegisteredTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollWriteBeforeRegisteredTest.java index 975ee69531..7e93faf0cd 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollWriteBeforeRegisteredTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollWriteBeforeRegisteredTest.java @@ -25,6 +25,6 @@ public class EpollWriteBeforeRegisteredTest extends WriteBeforeRegisteredTest { @Override protected List> newFactories() { - return EpollSocketTestPermutation.INSTANCE.clientSocket(); + return EpollSocketTestPermutation.INSTANCE.clientSocketWithFastOpen(); } } diff --git a/transport-native-unix-common/src/main/c/netty_unix_socket.c b/transport-native-unix-common/src/main/c/netty_unix_socket.c index da313eb65f..4243fbd288 100644 --- a/transport-native-unix-common/src/main/c/netty_unix_socket.c +++ b/transport-native-unix-common/src/main/c/netty_unix_socket.c @@ -39,6 +39,11 @@ #define SO_REUSEPORT 15 #endif /* SO_REUSEPORT */ +// MSG_FASTOPEN is defined in linux 3.6. We define this here so older kernels can compile. +#ifndef MSG_FASTOPEN +#define MSG_FASTOPEN 0x20000000 +#endif + static jclass datagramSocketAddressClass = NULL; static jmethodID datagramSocketAddrMethodId = NULL; static jmethodID inetSocketAddrMethodId = NULL; @@ -309,7 +314,7 @@ int netty_unix_socket_initSockaddr(JNIEnv* env, jboolean ipv6, jbyteArray addres return 0; } -static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) { +static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port, jint flags) { struct sockaddr_storage addr; socklen_t addrSize; if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) { @@ -319,7 +324,7 @@ static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos, ssize_t res; int err; do { - res = sendto(fd, buffer + pos, (size_t) (limit - pos), 0, (struct sockaddr*) &addr, addrSize); + res = sendto(fd, buffer + pos, (size_t) (limit - pos), flags, (struct sockaddr*) &addr, addrSize); // keep on writing if it was interrupted } while (res == -1 && ((err = errno) == EINTR)); @@ -623,16 +628,16 @@ static jint netty_unix_socket_newSocketDomainFd(JNIEnv* env, jclass clazz) { return fd; } -static jint netty_unix_socket_sendTo(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) { +static jint netty_unix_socket_sendTo(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port, jint flags) { // We check that GetDirectBufferAddress will not return NULL in OnLoad - return _sendTo(env, fd, ipv6, (*env)->GetDirectBufferAddress(env, jbuffer), pos, limit, address, scopeId, port); + return _sendTo(env, fd, ipv6, (*env)->GetDirectBufferAddress(env, jbuffer), pos, limit, address, scopeId, port, flags); } -static jint netty_unix_socket_sendToAddress(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) { - return _sendTo(env, fd, ipv6, (void *) (intptr_t) memoryAddress, pos, limit, address, scopeId, port); +static jint netty_unix_socket_sendToAddress(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port, jint flags) { + return _sendTo(env, fd, ipv6, (void *) (intptr_t) memoryAddress, pos, limit, address, scopeId, port, flags); } -static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port) { +static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port, jint flags) { struct sockaddr_storage addr; socklen_t addrSize; if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) { @@ -648,7 +653,7 @@ static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd ssize_t res; int err; do { - res = sendmsg(fd, &m, 0); + res = sendmsg(fd, &m, flags); // keep on writing if it was interrupted } while (res == -1 && ((err = errno) == EINTR)); @@ -965,6 +970,9 @@ static jint netty_unix_socket_isBroadcast(JNIEnv* env, jclass clazz, jint fd) { return optval; } +static jint netty_unit_socket_msgFastopen(JNIEnv* env, jclass clazz) { + return MSG_FASTOPEN; +} // JNI Registered Methods End @@ -982,9 +990,9 @@ static const JNINativeMethod fixed_method_table[] = { { "newSocketDgramFd", "(Z)I", (void *) netty_unix_socket_newSocketDgramFd }, { "newSocketStreamFd", "(Z)I", (void *) netty_unix_socket_newSocketStreamFd }, { "newSocketDomainFd", "()I", (void *) netty_unix_socket_newSocketDomainFd }, - { "sendTo", "(IZLjava/nio/ByteBuffer;II[BII)I", (void *) netty_unix_socket_sendTo }, - { "sendToAddress", "(IZJII[BII)I", (void *) netty_unix_socket_sendToAddress }, - { "sendToAddresses", "(IZJI[BII)I", (void *) netty_unix_socket_sendToAddresses }, + { "sendTo", "(IZLjava/nio/ByteBuffer;II[BIII)I", (void *) netty_unix_socket_sendTo }, + { "sendToAddress", "(IZJII[BIII)I", (void *) netty_unix_socket_sendToAddress }, + { "sendToAddresses", "(IZJI[BIII)I", (void *) netty_unix_socket_sendToAddresses }, // "recvFrom" has a dynamic signature // "recvFromAddress" has a dynamic signature { "recvFd", "(I)I", (void *) netty_unix_socket_recvFd }, @@ -1012,7 +1020,8 @@ static const JNINativeMethod fixed_method_table[] = { { "getSoError", "(I)I", (void *) netty_unix_socket_getSoError }, { "initialize", "(Z)V", (void *) netty_unix_socket_initialize }, { "isIPv6Preferred", "()Z", (void *) netty_unix_socket_isIPv6Preferred }, - { "isIPv6", "(I)Z", (void *) netty_unix_socket_isIPv6 } + { "isIPv6", "(I)Z", (void *) netty_unix_socket_isIPv6 }, + { "msgFastopen", "()I", (void *) netty_unit_socket_msgFastopen } }; static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java index 602872891d..2a1ffa65d3 100644 --- a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java @@ -113,29 +113,10 @@ public class Socket extends FileDescriptor { } public final int sendTo(ByteBuffer buf, int pos, int limit, InetAddress addr, int port) throws IOException { - // just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected - // to be called frequently - byte[] address; - int scopeId; - if (addr instanceof Inet6Address) { - address = addr.getAddress(); - scopeId = ((Inet6Address) addr).getScopeId(); - } else { - // convert to ipv4 mapped ipv6 address; - scopeId = 0; - address = ipv4MappedIpv6Address(addr.getAddress()); - } - int res = sendTo(fd, useIpv6(addr), buf, pos, limit, address, scopeId, port); - if (res >= 0) { - return res; - } - if (res == ERROR_ECONNREFUSED_NEGATIVE) { - throw new PortUnreachableException("sendTo failed"); - } - return ioResult("sendTo", res); + return sendTo(buf, pos, limit, addr, port, false); } - public final int sendToAddress(long memoryAddress, int pos, int limit, InetAddress addr, int port) + public final int sendTo(ByteBuffer buf, int pos, int limit, InetAddress addr, int port, boolean fastOpen) throws IOException { // just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected // to be called frequently @@ -149,17 +130,30 @@ public class Socket extends FileDescriptor { scopeId = 0; address = ipv4MappedIpv6Address(addr.getAddress()); } - int res = sendToAddress(fd, useIpv6(addr), memoryAddress, pos, limit, address, scopeId, port); + int flags = fastOpen ? msgFastopen() : 0; + int res = sendTo(fd, useIpv6(addr), buf, pos, limit, address, scopeId, port, flags); if (res >= 0) { return res; } - if (res == ERROR_ECONNREFUSED_NEGATIVE) { - throw new PortUnreachableException("sendToAddress failed"); + if (res == ERRNO_EINPROGRESS_NEGATIVE && fastOpen) { + // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection. + // In this case, our TCP connection will be established normally, but no data was transmitted at this time. + // We'll just transmit the data with normal writes later. + return 0; } - return ioResult("sendToAddress", res); + if (res == ERROR_ECONNREFUSED_NEGATIVE) { + throw new PortUnreachableException("sendTo failed"); + } + return ioResult("sendTo", res); } - public final int sendToAddresses(long memoryAddress, int length, InetAddress addr, int port) throws IOException { + public final int sendToAddress(long memoryAddress, int pos, int limit, InetAddress addr, int port) + throws IOException { + return sendToAddress(memoryAddress, pos, limit, addr, port, false); + } + + public final int sendToAddress(long memoryAddress, int pos, int limit, InetAddress addr, int port, + boolean fastOpen) throws IOException { // just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected // to be called frequently byte[] address; @@ -172,11 +166,52 @@ public class Socket extends FileDescriptor { scopeId = 0; address = ipv4MappedIpv6Address(addr.getAddress()); } - int res = sendToAddresses(fd, useIpv6(addr), memoryAddress, length, address, scopeId, port); + int flags = fastOpen ? msgFastopen() : 0; + int res = sendToAddress(fd, useIpv6(addr), memoryAddress, pos, limit, address, scopeId, port, flags); if (res >= 0) { return res; } + if (res == ERRNO_EINPROGRESS_NEGATIVE && fastOpen) { + // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection. + // In this case, our TCP connection will be established normally, but no data was transmitted at this time. + // We'll just transmit the data with normal writes later. + return 0; + } + if (res == ERROR_ECONNREFUSED_NEGATIVE) { + throw new PortUnreachableException("sendToAddress failed"); + } + return ioResult("sendToAddress", res); + } + public final int sendToAddresses(long memoryAddress, int length, InetAddress addr, int port) throws IOException { + return sendToAddresses(memoryAddress, length, addr, port, false); + } + + public final int sendToAddresses(long memoryAddress, int length, InetAddress addr, int port, boolean fastOpen) + throws IOException { + // just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected + // to be called frequently + byte[] address; + int scopeId; + if (addr instanceof Inet6Address) { + address = addr.getAddress(); + scopeId = ((Inet6Address) addr).getScopeId(); + } else { + // convert to ipv4 mapped ipv6 address; + scopeId = 0; + address = ipv4MappedIpv6Address(addr.getAddress()); + } + int flags = fastOpen ? msgFastopen() : 0; + int res = sendToAddresses(fd, useIpv6(addr), memoryAddress, length, address, scopeId, port, flags); + if (res >= 0) { + return res; + } + if (res == ERRNO_EINPROGRESS_NEGATIVE && fastOpen) { + // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection. + // In this case, our TCP connection will be established normally, but no data was transmitted at this time. + // We'll just transmit the data with normal writes later. + return 0; + } if (res == ERROR_ECONNREFUSED_NEGATIVE) { throw new PortUnreachableException("sendToAddresses failed"); } @@ -467,11 +502,16 @@ public class Socket extends FileDescriptor { private static native byte[] localAddress(int fd); private static native int sendTo( - int fd, boolean ipv6, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port); + int fd, boolean ipv6, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port, + int flags); + private static native int sendToAddress( - int fd, boolean ipv6, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port); + int fd, boolean ipv6, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port, + int flags); + private static native int sendToAddresses( - int fd, boolean ipv6, long memoryAddress, int length, byte[] address, int scopeId, int port); + int fd, boolean ipv6, long memoryAddress, int length, byte[] address, int scopeId, int port, + int flags); private static native DatagramSocketAddress recvFrom( int fd, ByteBuffer buf, int pos, int limit) throws IOException; @@ -479,6 +519,7 @@ public class Socket extends FileDescriptor { int fd, long memoryAddress, int pos, int limit) throws IOException; private static native int recvFd(int fd); private static native int sendFd(int socketFd, int fd); + private static native int msgFastopen(); private static native int newSocketStreamFd(boolean ipv6); private static native int newSocketDgramFd(boolean ipv6); diff --git a/transport/src/main/java/io/netty/channel/ChannelOption.java b/transport/src/main/java/io/netty/channel/ChannelOption.java index 260a68dcb2..d75ed02393 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOption.java +++ b/transport/src/main/java/io/netty/channel/ChannelOption.java @@ -24,7 +24,7 @@ import java.net.InetAddress; import java.net.NetworkInterface; /** - * A {@link ChannelOption} allows to configure a {@link ChannelConfig} in a type-safe + * A {@link ChannelOption} allows to configure a {@link ChannelConfig} in a type-safe * way. Which {@link ChannelOption} is supported depends on the actual implementation * of {@link ChannelConfig} and may depend on the nature of the transport it belongs * to. @@ -125,6 +125,7 @@ public class ChannelOption extends AbstractConstant> { public static final ChannelOption IP_MULTICAST_LOOP_DISABLED = valueOf("IP_MULTICAST_LOOP_DISABLED"); public static final ChannelOption TCP_NODELAY = valueOf("TCP_NODELAY"); + public static final ChannelOption TCP_FASTOPEN_CONNECT = valueOf("TCP_FASTOPEN_CONNECT"); @Deprecated public static final ChannelOption DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION =