TCP Fast Open for clients (#11006)

Support TCP Fast Open for clients and make SslHandler take advantage

Motivation:
- TCP Fast Open allow us to send a small amount of data along side the initial SYN packet when establishing a TCP connection.
- The TLS Client Hello packet is small enough to fit in there, and is also idempotent (another requirement for using TCP Fast Open), so if we can save a round-trip when establishing TLS connections when using TFO.

Modification:
- Add support for client-side TCP Fast Open for Epoll, and also lowers the Linux kernel version requirements to 3.6.
- When adding the SslHandler to a pipeline, if TCP Fast Open is enabled for the channel (and the channel is not already active) then start the handshake early by writing it to the outbound buffer.
- An important detail to note here, is that the outbound buffer is not flushed at this point, like it would for normal handshakes. The flushing happens later as part of establishing the TCP connection.

Result:
- It is now possible for clients (on epoll) to open connections with TCP Fast Open.
- The SslHandler automatically detects when this is the case, and now send its Client Hello message as part of the initial data in the TCP Fast Open flow when available, saving a round-trip when establishing TLS connections.

Co-authored-by: Colin Godsey <crgodsey@gmail.com>
This commit is contained in:
Chris Vest 2021-02-15 13:13:44 +01:00 committed by GitHub
parent 9cac18687d
commit 56adab2743
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 426 additions and 208 deletions

View File

@ -28,6 +28,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
@ -780,7 +781,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
forceFlush(ctx);
// Explicit start handshake processing once we send the first message. This will also ensure
// we will schedule the timeout if needed.
startHandshakeProcessing();
startHandshakeProcessing(true);
return;
}
@ -1957,20 +1958,26 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(ctx.channel(), 16);
if (ctx.channel().isActive()) {
startHandshakeProcessing();
Channel channel = ctx.channel();
pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16);
boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
boolean active = channel.isActive();
if (active || fastOpen) {
// Do not flush the handshake when TCP Fast Open is enabled, unless the channel is active.
// With TCP Fast Open, we write to the outbound buffer before the TCP connect is established.
// The buffer will then be flushed as part of estabilishing the connection, saving us a round-trip.
startHandshakeProcessing(active || !fastOpen);
}
}
private void startHandshakeProcessing() {
private void startHandshakeProcessing(boolean flushAtEnd) {
if (!handshakeStarted) {
handshakeStarted = true;
if (engine.getUseClientMode()) {
// Begin the initial handshake.
// channelActive() event has been fired already, which means this.channelActive() will
// not be invoked. We have to initialize here instead.
handshake();
handshake(flushAtEnd);
}
applyHandshakeTimeout();
}
@ -2022,28 +2029,30 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
oldHandshakePromise.addListener(new PromiseNotifier<Channel, Future<Channel>>(newHandshakePromise));
} else {
handshakePromise = newHandshakePromise;
handshake();
handshake(true);
applyHandshakeTimeout();
}
}
/**
* Performs TLS (re)negotiation.
* @param flushAtEnd Set to {@code true} if the outbound buffer should be flushed (written to the network) at the
* end. Set to {@code false} if the handshake will be flushed later, e.g. as part of TCP Fast Open
* connect.
*/
private void handshake() {
private void handshake(boolean flushAtEnd) {
if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
// Not all SSLEngine implementations support calling beginHandshake multiple times while a handshake
// is in progress. See https://github.com/netty/netty/issues/4718.
return;
} else {
if (handshakePromise.isDone()) {
// If the handshake is done already lets just return directly as there is no need to trigger it again.
// This can happen if the handshake(...) was triggered before we called channelActive(...) by a
// flush() that was triggered by a ChannelFutureListener that was added to the ChannelFuture returned
// from the connect(...) method. In this case we will see the flush() happen before we had a chance to
// call fireChannelActive() on the pipeline.
return;
}
}
if (handshakePromise.isDone()) {
// If the handshake is done already lets just return directly as there is no need to trigger it again.
// This can happen if the handshake(...) was triggered before we called channelActive(...) by a
// flush() that was triggered by a ChannelFutureListener that was added to the ChannelFuture returned
// from the connect(...) method. In this case we will see the flush() happen before we had a chance to
// call fireChannelActive() on the pipeline.
return;
}
// Begin handshake.
@ -2054,7 +2063,9 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
} catch (Throwable e) {
setHandshakeFailure(ctx, e);
} finally {
forceFlush(ctx);
if (flushAtEnd) {
forceFlush(ctx);
}
}
}
@ -2105,7 +2116,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
if (!startTls) {
startHandshakeProcessing();
startHandshakeProcessing(true);
}
ctx.fireChannelActive();
}

View File

@ -17,19 +17,36 @@ package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.StringUtil;
import org.junit.AssumptionViolatedException;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import static org.junit.Assert.*;
import static io.netty.buffer.ByteBufUtil.writeAscii;
import static io.netty.buffer.UnpooledByteBufAllocator.DEFAULT;
import static io.netty.util.CharsetUtil.US_ASCII;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class SocketConnectTest extends AbstractSocketTest {
@ -110,8 +127,93 @@ public class SocketConnectTest extends AbstractSocketTest {
}
}
@Test(timeout = 3000)
public void testWriteWithFastOpenBeforeConnect() throws Throwable {
run();
}
public void testWriteWithFastOpenBeforeConnect(ServerBootstrap sb, Bootstrap cb) throws Throwable {
enableTcpFastOpen(sb, cb);
sb.childOption(ChannelOption.AUTO_READ, true);
cb.option(ChannelOption.AUTO_READ, true);
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
});
Channel sc = sb.bind().sync().channel();
connectAndVerifyDataTransfer(cb, sc);
connectAndVerifyDataTransfer(cb, sc);
}
private static void connectAndVerifyDataTransfer(Bootstrap cb, Channel sc)
throws InterruptedException {
BufferingClientHandler handler = new BufferingClientHandler();
cb.handler(handler);
ChannelFuture register = cb.register();
Channel channel = register.sync().channel();
ChannelFuture write = channel.write(writeAscii(DEFAULT, "[fastopen]"));
SocketAddress remoteAddress = sc.localAddress();
ChannelFuture connectFuture = channel.connect(remoteAddress);
Channel cc = connectFuture.sync().channel();
cc.writeAndFlush(writeAscii(DEFAULT, "[normal data]")).sync();
write.sync();
String expectedString = "[fastopen][normal data]";
String result = handler.collectBuffer(expectedString.getBytes(US_ASCII).length);
cc.disconnect().sync();
assertEquals(expectedString, result);
}
protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) {
throw new AssumptionViolatedException(
"Support for testing TCP_FASTOPEN not enabled for " + StringUtil.simpleClassName(this));
}
private static void assertLocalAddress(InetSocketAddress address) {
assertTrue(address.getPort() > 0);
assertFalse(address.getAddress().isAnyLocalAddress());
}
private static class BufferingClientHandler extends ChannelInboundHandlerAdapter {
private final Semaphore semaphore = new Semaphore(0);
private final ByteArrayOutputStream streamBuffer = new ByteArrayOutputStream();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
buf.readBytes(streamBuffer, readableBytes);
semaphore.release(readableBytes);
buf.release();
} else {
throw new IllegalArgumentException("Unexpected message type: " + msg);
}
}
String collectBuffer(int expectedBytes) throws InterruptedException {
semaphore.acquire(expectedBytes);
byte[] bytes = streamBuffer.toByteArray();
streamBuffer.reset();
return new String(bytes, US_ASCII);
}
}
private static final class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buffer = ctx.alloc().buffer();
ByteBuf buf = (ByteBuf) msg;
buffer.writeBytes(buf);
buf.release();
ctx.channel().writeAndFlush(buffer);
} else {
throw new IllegalArgumentException("Unexpected message type: " + msg);
}
}
}
}

View File

@ -65,7 +65,7 @@ public class SocketMultipleConnectTest extends AbstractSocketTest {
List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> factories
= new ArrayList<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>>();
for (TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap> comboFactory
: SocketTestPermutation.INSTANCE.socket()) {
: SocketTestPermutation.INSTANCE.socketWithFastOpen()) {
if (comboFactory.newClientInstance().config().group() instanceof NioEventLoopGroup) {
factories.add(comboFactory);
}

View File

@ -40,7 +40,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SocketReadPendingTest extends AbstractSocketTest {
@Test(timeout = 30000)
@Test(timeout = 60000)
public void testReadPendingIsResetAfterEachRead() throws Throwable {
run();
}

View File

@ -113,6 +113,22 @@ public class SocketTestPermutation {
return list;
}
public List<BootstrapComboFactory<ServerBootstrap, Bootstrap>> socketWithFastOpen() {
// Make the list of ServerBootstrap factories.
List<BootstrapFactory<ServerBootstrap>> sbfs = serverSocket();
// Make the list of Bootstrap factories.
List<BootstrapFactory<Bootstrap>> cbfs = clientSocketWithFastOpen();
// Populate the combinations
List<BootstrapComboFactory<ServerBootstrap, Bootstrap>> list = combo(sbfs, cbfs);
// Remove the OIO-OIO case which often leads to a dead lock by its nature.
list.remove(list.size() - 1);
return list;
}
public List<BootstrapComboFactory<Bootstrap, Bootstrap>> datagram(final InternetProtocolFamily family) {
// Make the list of Bootstrap factories.
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
@ -183,6 +199,10 @@ public class SocketTestPermutation {
);
}
public List<BootstrapFactory<Bootstrap>> clientSocketWithFastOpen() {
return clientSocket();
}
public List<BootstrapFactory<Bootstrap>> datagramSocket() {
return Arrays.asList(
new BootstrapFactory<Bootstrap>() {

View File

@ -124,10 +124,6 @@ static void netty_epoll_linuxsocket_setTcpFastOpen(JNIEnv* env, jclass clazz, ji
netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_FASTOPEN, &optval, sizeof(optval));
}
static void netty_epoll_linuxsocket_setTcpFastOpenConnect(JNIEnv* env, jclass clazz, jint fd, jint optval) {
netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_FASTOPEN_CONNECT, &optval, sizeof(optval));
}
static void netty_epoll_linuxsocket_setTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd, jint optval) {
netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
}
@ -596,20 +592,6 @@ static jint netty_epoll_linuxsocket_isTcpQuickAck(JNIEnv* env, jclass clazz, jin
return optval;
}
static jint netty_epoll_linuxsocket_isTcpFastOpenConnect(JNIEnv* env, jclass clazz, jint fd) {
int optval;
// We call netty_unix_socket_getOption0 directly so we can handle ENOPROTOOPT by ourself.
if (netty_unix_socket_getOption0(fd, IPPROTO_TCP, TCP_FASTOPEN_CONNECT, &optval, sizeof(optval)) == -1) {
if (errno == ENOPROTOOPT) {
// Not supported by the system, so just return 0.
return 0;
}
netty_unix_socket_getOptionHandleError(env, errno);
return -1;
}
return optval;
}
static jint netty_epoll_linuxsocket_getTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &optval, sizeof(optval)) == -1) {
@ -681,8 +663,6 @@ static const JNINativeMethod fixed_method_table[] = {
{ "getTcpNotSentLowAt", "(I)I", (void *) netty_epoll_linuxsocket_getTcpNotSentLowAt },
{ "isTcpQuickAck", "(I)I", (void *) netty_epoll_linuxsocket_isTcpQuickAck },
{ "setTcpFastOpen", "(II)V", (void *) netty_epoll_linuxsocket_setTcpFastOpen },
{ "setTcpFastOpenConnect", "(II)V", (void *) netty_epoll_linuxsocket_setTcpFastOpenConnect },
{ "isTcpFastOpenConnect", "(I)I", (void *) netty_epoll_linuxsocket_isTcpFastOpenConnect },
{ "setTcpKeepIdle", "(II)V", (void *) netty_epoll_linuxsocket_setTcpKeepIdle },
{ "setTcpKeepIntvl", "(II)V", (void *) netty_epoll_linuxsocket_setTcpKeepIntvl },
{ "setTcpKeepCnt", "(II)V", (void *) netty_epoll_linuxsocket_setTcpKeepCnt },

View File

@ -31,11 +31,11 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.AbstractNioChannel;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.Socket;
import io.netty.channel.unix.UnixChannel;
import io.netty.util.ReferenceCountUtil;
@ -377,6 +377,43 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
return WRITE_STATUS_SNDBUF_FULL;
}
/**
* Write bytes to the socket, with or without a remote address.
* Used for datagram and TCP client fast open writes.
*/
final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boolean fastOpen)
throws IOException {
assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address";
if (data.hasMemoryAddress()) {
long memoryAddress = data.memoryAddress();
if (remoteAddress == null) {
return socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
}
return socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
}
if (data.nioBufferCount() > 1) {
IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
array.add(data, data.readerIndex(), data.readableBytes());
int cnt = array.count();
assert cnt != 0;
if (remoteAddress == null) {
return socket.writevAddresses(array.memoryAddress(0), cnt);
}
return socket.sendToAddresses(array.memoryAddress(0), cnt,
remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
}
ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
if (remoteAddress == null) {
return socket.write(nioData, nioData.position(), nioData.limit());
}
return socket.sendTo(nioData, nioData.position(), nioData.limit(),
remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
}
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
boolean readPending;
boolean maybeMoreDataToRead;
@ -730,7 +767,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
return connected;
}
private boolean doConnect0(SocketAddress remote) throws Exception {
boolean doConnect0(SocketAddress remote) throws Exception {
boolean success = false;
try {
boolean connected = socket.connect(remote);

View File

@ -33,8 +33,12 @@ public final class EpollChannelOption<T> extends UnixChannelOption<T> {
public static final ChannelOption<Boolean> IP_TRANSPARENT = valueOf("IP_TRANSPARENT");
public static final ChannelOption<Boolean> IP_RECVORIGDSTADDR = valueOf("IP_RECVORIGDSTADDR");
public static final ChannelOption<Integer> TCP_FASTOPEN = valueOf(EpollChannelOption.class, "TCP_FASTOPEN");
public static final ChannelOption<Boolean> TCP_FASTOPEN_CONNECT =
valueOf(EpollChannelOption.class, "TCP_FASTOPEN_CONNECT");
/**
* @deprecated Use {@link ChannelOption#TCP_FASTOPEN_CONNECT} instead.
*/
@Deprecated
public static final ChannelOption<Boolean> TCP_FASTOPEN_CONNECT = ChannelOption.TCP_FASTOPEN_CONNECT;
public static final ChannelOption<Integer> TCP_DEFER_ACCEPT =
ChannelOption.valueOf(EpollChannelOption.class, "TCP_DEFER_ACCEPT");
public static final ChannelOption<Boolean> TCP_QUICKACK = valueOf(EpollChannelOption.class, "TCP_QUICKACK");

View File

@ -32,7 +32,6 @@ import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.unix.DatagramSocketAddress;
import io.netty.channel.unix.Errors;
import io.netty.channel.unix.Errors.NativeIoException;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.Socket;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.ReferenceCountUtil;
@ -350,7 +349,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
private boolean doWriteMessage(Object msg) throws Exception {
final ByteBuf data;
InetSocketAddress remoteAddress;
final InetSocketAddress remoteAddress;
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
@ -367,38 +366,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
return true;
}
final long writtenBytes;
if (data.hasMemoryAddress()) {
long memoryAddress = data.memoryAddress();
if (remoteAddress == null) {
writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
} else {
writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
remoteAddress.getAddress(), remoteAddress.getPort());
}
} else if (data.nioBufferCount() > 1) {
IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
array.add(data, data.readerIndex(), data.readableBytes());
int cnt = array.count();
assert cnt != 0;
if (remoteAddress == null) {
writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
} else {
writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
remoteAddress.getAddress(), remoteAddress.getPort());
}
} else {
ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
if (remoteAddress == null) {
writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
} else {
writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
remoteAddress.getAddress(), remoteAddress.getPort());
}
}
return writtenBytes > 0;
return doWriteOrSendBytes(data, remoteAddress, false) > 0;
}
@Override

View File

@ -15,8 +15,10 @@
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
@ -24,6 +26,7 @@ import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@ -111,6 +114,29 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
return new EpollSocketChannelUnsafe();
}
@Override
boolean doConnect0(SocketAddress remote) throws Exception {
if (Native.IS_SUPPORTING_TCP_FASTOPEN && config.isTcpFastOpenConnect()) {
ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
outbound.addFlush();
Object curr;
if ((curr = outbound.current()) instanceof ByteBuf) {
ByteBuf initialData = (ByteBuf) curr;
// If no cookie is present, the write fails with EINPROGRESS and this call basically
// becomes a normal async connect. All writes will be sent normally afterwards.
long localFlushedAmount = doWriteOrSendBytes(
initialData, (InetSocketAddress) remote, true);
if (localFlushedAmount > 0) {
// We had a cookie and our fast-open proceeded. Remove written data
// then continue with normal TCP operation.
outbound.removeBytes(localFlushedAmount);
return true;
}
}
}
return super.doConnect0(remote);
}
private final class EpollSocketChannelUnsafe extends EpollStreamUnsafe {
@Override
protected Executor prepareToClose() {

View File

@ -39,6 +39,7 @@ import static io.netty.channel.ChannelOption.TCP_NODELAY;
public final class EpollSocketChannelConfig extends EpollChannelConfig implements SocketChannelConfig {
private volatile boolean allowHalfClosure;
private volatile boolean tcpFastopen;
/**
* Creates a new instance.
@ -60,7 +61,7 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
ALLOW_HALF_CLOSURE, EpollChannelOption.TCP_CORK, EpollChannelOption.TCP_NOTSENT_LOWAT,
EpollChannelOption.TCP_KEEPCNT, EpollChannelOption.TCP_KEEPIDLE, EpollChannelOption.TCP_KEEPINTVL,
EpollChannelOption.TCP_MD5SIG, EpollChannelOption.TCP_QUICKACK, EpollChannelOption.IP_TRANSPARENT,
EpollChannelOption.TCP_FASTOPEN_CONNECT, EpollChannelOption.SO_BUSY_POLL);
ChannelOption.TCP_FASTOPEN_CONNECT, EpollChannelOption.SO_BUSY_POLL);
}
@SuppressWarnings("unchecked")
@ -114,7 +115,7 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
if (option == EpollChannelOption.IP_TRANSPARENT) {
return (T) Boolean.valueOf(isIpTransparent());
}
if (option == EpollChannelOption.TCP_FASTOPEN_CONNECT) {
if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
return (T) Boolean.valueOf(isTcpFastOpenConnect());
}
if (option == EpollChannelOption.SO_BUSY_POLL) {
@ -163,7 +164,7 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
setTcpMd5Sig(m);
} else if (option == EpollChannelOption.TCP_QUICKACK) {
setTcpQuickAck((Boolean) value);
} else if (option == EpollChannelOption.TCP_FASTOPEN_CONNECT) {
} else if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
setTcpFastOpenConnect((Boolean) value);
} else if (option == EpollChannelOption.SO_BUSY_POLL) {
setSoBusyPoll((Integer) value);
@ -551,29 +552,21 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
}
/**
* Set the {@code TCP_FASTOPEN_CONNECT} option on the socket. Requires Linux kernel 4.11 or later.
* See
* <a href="https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=19f6d3f3">this commit</a>
* for more details.
* Enables client TCP fast open. {@code TCP_FASTOPEN_CONNECT} normally
* requires Linux kernel 4.11 or later, so instead we use the traditional fast open
* client socket mechanics that work with kernel 3.6 and later. See this
* <a href="https://lwn.net/Articles/508865/">LWN article</a> for more info.
*/
public EpollSocketChannelConfig setTcpFastOpenConnect(boolean fastOpenConnect) {
try {
((EpollSocketChannel) channel).socket.setTcpFastOpenConnect(fastOpenConnect);
return this;
} catch (IOException e) {
throw new ChannelException(e);
}
tcpFastopen = fastOpenConnect;
return this;
}
/**
* Returns {@code true} if {@code TCP_FASTOPEN_CONNECT} is enabled, {@code false} otherwise.
* Returns {@code true} if TCP fast open is enabled, {@code false} otherwise.
*/
public boolean isTcpFastOpenConnect() {
try {
return ((EpollSocketChannel) channel).socket.isTcpFastOpenConnect();
} catch (IOException e) {
throw new ChannelException(e);
}
return tcpFastopen;
}
@Override

View File

@ -179,14 +179,6 @@ final class LinuxSocket extends Socket {
setTcpFastOpen(intValue(), tcpFastopenBacklog);
}
void setTcpFastOpenConnect(boolean tcpFastOpenConnect) throws IOException {
setTcpFastOpenConnect(intValue(), tcpFastOpenConnect ? 1 : 0);
}
boolean isTcpFastOpenConnect() throws IOException {
return isTcpFastOpenConnect(intValue()) != 0;
}
void setTcpKeepIdle(int seconds) throws IOException {
setTcpKeepIdle(intValue(), seconds);
}
@ -369,7 +361,6 @@ final class LinuxSocket extends Socket {
private static native int isIpRecvOrigDestAddr(int fd) throws IOException;
private static native void getTcpInfo(int fd, long[] array) throws IOException;
private static native PeerCredentials getPeerCredentials(int fd) throws IOException;
private static native int isTcpFastOpenConnect(int fd) throws IOException;
private static native void setTcpDeferAccept(int fd, int deferAccept) throws IOException;
private static native void setTcpQuickAck(int fd, int quickAck) throws IOException;
@ -377,7 +368,6 @@ final class LinuxSocket extends Socket {
private static native void setSoBusyPoll(int fd, int loopMicros) throws IOException;
private static native void setTcpNotSentLowAt(int fd, int tcpNotSentLowAt) throws IOException;
private static native void setTcpFastOpen(int fd, int tcpFastopenBacklog) throws IOException;
private static native void setTcpFastOpenConnect(int fd, int tcpFastOpenConnect) throws IOException;
private static native void setTcpKeepIdle(int fd, int seconds) throws IOException;
private static native void setTcpKeepIntvl(int fd, int seconds) throws IOException;
private static native void setTcpKeepCnt(int fd, int probes) throws IOException;

View File

@ -26,7 +26,7 @@ import java.util.List;
public class EpollCompositeBufferGatheringWriteTest extends CompositeBufferGatheringWriteTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
}
@Override

View File

@ -26,7 +26,7 @@ import java.util.List;
public class EpollETSocketAutoReadTest extends SocketAutoReadTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
@Override

View File

@ -26,7 +26,7 @@ import java.util.List;
public class EpollETSocketConditionalWritabilityTest extends SocketConditionalWritabilityTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
@Override

View File

@ -26,7 +26,7 @@ import java.util.List;
public class EpollETSocketDataReadInitialStateTest extends SocketDataReadInitialStateTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
@Override

View File

@ -26,7 +26,7 @@ import java.util.List;
public class EpollETSocketExceptionHandlingTest extends SocketExceptionHandlingTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
@Override

View File

@ -26,7 +26,7 @@ import java.util.List;
public class EpollETSocketHalfClosed extends SocketHalfClosedTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
}
@Override

View File

@ -26,7 +26,7 @@ import java.util.List;
public class EpollETSocketReadPendingTest extends SocketReadPendingTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
@Override

View File

@ -26,7 +26,7 @@ import java.util.List;
public class EpollLTSocketAutoReadTest extends SocketAutoReadTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
@Override

View File

@ -26,7 +26,7 @@ import java.util.List;
public class EpollLTSocketConditionalWritabilityTest extends SocketConditionalWritabilityTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
@Override

View File

@ -26,7 +26,7 @@ import java.util.List;
public class EpollLTSocketDataReadInitialStateTest extends SocketDataReadInitialStateTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
@Override

View File

@ -26,7 +26,7 @@ import java.util.List;
public class EpollLTSocketExceptionHandlingTest extends SocketExceptionHandlingTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
@Override

View File

@ -26,7 +26,7 @@ import java.util.List;
public class EpollLTSocketHalfClosed extends SocketHalfClosedTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
}
@Override

View File

@ -26,7 +26,7 @@ import java.util.List;
public class EpollLTSocketReadPendingTest extends SocketReadPendingTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
@Override

View File

@ -24,6 +24,6 @@ import java.util.List;
public class EpollSocketChannelNotYetConnectedTest extends SocketChannelNotYetConnectedTest {
@Override
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.clientSocket();
return EpollSocketTestPermutation.INSTANCE.clientSocketWithFastOpen();
}
}

View File

@ -25,6 +25,6 @@ import java.util.List;
public class EpollSocketCloseForciblyTest extends SocketCloseForciblyTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
}
}

View File

@ -17,6 +17,7 @@ package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketConnectTest;
@ -26,6 +27,12 @@ public class EpollSocketConnectTest extends SocketConnectTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
}
@Override
protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) {
sb.childOption(EpollChannelOption.TCP_FASTOPEN, 5);
cb.option(ChannelOption.TCP_FASTOPEN_CONNECT, true);
}
}

View File

@ -26,6 +26,6 @@ public class EpollSocketEchoTest extends SocketEchoTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
}

View File

@ -26,6 +26,6 @@ public class EpollSocketFileRegionTest extends SocketFileRegionTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
}

View File

@ -26,6 +26,6 @@ public class EpollSocketFixedLengthEchoTest extends SocketFixedLengthEchoTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
}

View File

@ -26,6 +26,6 @@ public class EpollSocketGatheringWriteTest extends SocketGatheringWriteTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
}
}

View File

@ -32,7 +32,7 @@ public class EpollSocketMultipleConnectTest extends SocketMultipleConnectTest {
List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> factories
= new ArrayList<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>>();
for (TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap> comboFactory
: EpollSocketTestPermutation.INSTANCE.socket()) {
: EpollSocketTestPermutation.INSTANCE.socketWithFastOpen()) {
EventLoopGroup group = comboFactory.newClientInstance().config().group();
if (group instanceof NioEventLoopGroup || group instanceof EpollEventLoopGroup) {
factories.add(comboFactory);

View File

@ -26,6 +26,6 @@ public class EpollSocketObjectEchoTest extends SocketObjectEchoTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
}

View File

@ -32,7 +32,7 @@ import static org.junit.Assert.assertTrue;
public class EpollSocketRstTest extends SocketRstTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
}
@Override

View File

@ -31,6 +31,6 @@ public class EpollSocketSslClientRenegotiateTest extends SocketSslClientRenegoti
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
}

View File

@ -36,6 +36,6 @@ public class EpollSocketSslEchoTest extends SocketSslEchoTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
}

View File

@ -31,6 +31,6 @@ public class EpollSocketSslGreetingTest extends SocketSslGreetingTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
}

View File

@ -31,6 +31,6 @@ public class EpollSocketSslSessionReuseTest extends SocketSslSessionReuseTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
}

View File

@ -31,6 +31,6 @@ public class EpollSocketStartTlsTest extends SocketStartTlsTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
}

View File

@ -26,6 +26,6 @@ public class EpollSocketStringEchoTest extends SocketStringEchoTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
}
}

View File

@ -19,6 +19,7 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
@ -53,10 +54,21 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
new EpollEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-epoll-worker", true));
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollSocketTestPermutation.class);
// Constants describing if/how TCP Fast Open is allowed to work on Linux:
private static final int TFO_ENABLED_CLIENT = 1;
private static final int TFO_ENABLED_SERVER = 2;
@Override
public List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> socket() {
List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> list =
combo(serverSocket(), clientSocketWithFastOpen());
list.remove(list.size() - 1); // Exclude NIO x NIO test
return list;
}
public List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> socketWithoutFastOpen() {
List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> list =
combo(serverSocket(), clientSocket());
@ -76,7 +88,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
.channel(EpollServerSocketChannel.class);
}
});
if (isServerFastOpen()) {
if (isFastOpen()) {
toReturn.add(new BootstrapFactory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
@ -98,30 +110,49 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
return toReturn;
}
@SuppressWarnings("unchecked")
@Override
public List<BootstrapFactory<Bootstrap>> clientSocket() {
return Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class);
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class);
}
List<BootstrapFactory<Bootstrap>> toReturn = new ArrayList<BootstrapFactory<Bootstrap>>();
toReturn.add(new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class);
}
});
toReturn.add(new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class);
}
});
return toReturn;
}
@Override
public List<BootstrapFactory<Bootstrap>> clientSocketWithFastOpen() {
List<BootstrapFactory<Bootstrap>> factories = clientSocket();
if (isFastOpen()) {
int insertIndex = factories.size() - 1; // Keep NIO fixture last.
factories.add(insertIndex, new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class)
.option(ChannelOption.TCP_FASTOPEN_CONNECT, true);
}
);
});
}
return factories;
}
@Override
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram(
final InternetProtocolFamily family) {
// Make the list of Bootstrap factories.
@SuppressWarnings("unchecked")
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override
@ -165,7 +196,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
Collections.singletonList(datagramBootstrapFactory(family)));
}
private BootstrapFactory<Bootstrap> datagramBootstrapFactory(final InternetProtocolFamily family) {
private static BootstrapFactory<Bootstrap> datagramBootstrapFactory(final InternetProtocolFamily family) {
return new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
@ -226,8 +257,8 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
);
}
public boolean isServerFastOpen() {
return AccessController.doPrivileged(new PrivilegedAction<Integer>() {
public boolean isFastOpen() {
int tfoEnabled = AccessController.doPrivileged(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
int fastopen = 0;
@ -237,9 +268,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
try {
in = new BufferedReader(new FileReader(file));
fastopen = Integer.parseInt(in.readLine());
if (logger.isDebugEnabled()) {
logger.debug("{}: {}", file, fastopen);
}
logger.debug("{}: {}", file, fastopen);
} catch (Exception e) {
logger.debug("Failed to get TCP_FASTOPEN from: {}", file, e);
} finally {
@ -252,13 +281,13 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
}
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("{}: {} (non-existent)", file, fastopen);
}
logger.debug("{}: {} (non-existent)", file, fastopen);
}
return fastopen;
}
}) == 3;
});
// TCP Fast Open needs to be enabled for both clients and servers, before we can test our intergration with it.
return tfoEnabled == TFO_ENABLED_CLIENT + TFO_ENABLED_SERVER;
}
public static DomainSocketAddress newSocketAddress() {

View File

@ -25,6 +25,6 @@ public class EpollWriteBeforeRegisteredTest extends WriteBeforeRegisteredTest {
@Override
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.clientSocket();
return EpollSocketTestPermutation.INSTANCE.clientSocketWithFastOpen();
}
}

View File

@ -39,6 +39,11 @@
#define SO_REUSEPORT 15
#endif /* SO_REUSEPORT */
// MSG_FASTOPEN is defined in linux 3.6. We define this here so older kernels can compile.
#ifndef MSG_FASTOPEN
#define MSG_FASTOPEN 0x20000000
#endif
static jclass datagramSocketAddressClass = NULL;
static jmethodID datagramSocketAddrMethodId = NULL;
static jmethodID inetSocketAddrMethodId = NULL;
@ -309,7 +314,7 @@ int netty_unix_socket_initSockaddr(JNIEnv* env, jboolean ipv6, jbyteArray addres
return 0;
}
static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) {
static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port, jint flags) {
struct sockaddr_storage addr;
socklen_t addrSize;
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) {
@ -319,7 +324,7 @@ static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos,
ssize_t res;
int err;
do {
res = sendto(fd, buffer + pos, (size_t) (limit - pos), 0, (struct sockaddr*) &addr, addrSize);
res = sendto(fd, buffer + pos, (size_t) (limit - pos), flags, (struct sockaddr*) &addr, addrSize);
// keep on writing if it was interrupted
} while (res == -1 && ((err = errno) == EINTR));
@ -623,16 +628,16 @@ static jint netty_unix_socket_newSocketDomainFd(JNIEnv* env, jclass clazz) {
return fd;
}
static jint netty_unix_socket_sendTo(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) {
static jint netty_unix_socket_sendTo(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port, jint flags) {
// We check that GetDirectBufferAddress will not return NULL in OnLoad
return _sendTo(env, fd, ipv6, (*env)->GetDirectBufferAddress(env, jbuffer), pos, limit, address, scopeId, port);
return _sendTo(env, fd, ipv6, (*env)->GetDirectBufferAddress(env, jbuffer), pos, limit, address, scopeId, port, flags);
}
static jint netty_unix_socket_sendToAddress(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) {
return _sendTo(env, fd, ipv6, (void *) (intptr_t) memoryAddress, pos, limit, address, scopeId, port);
static jint netty_unix_socket_sendToAddress(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port, jint flags) {
return _sendTo(env, fd, ipv6, (void *) (intptr_t) memoryAddress, pos, limit, address, scopeId, port, flags);
}
static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port) {
static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port, jint flags) {
struct sockaddr_storage addr;
socklen_t addrSize;
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) {
@ -648,7 +653,7 @@ static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd
ssize_t res;
int err;
do {
res = sendmsg(fd, &m, 0);
res = sendmsg(fd, &m, flags);
// keep on writing if it was interrupted
} while (res == -1 && ((err = errno) == EINTR));
@ -965,6 +970,9 @@ static jint netty_unix_socket_isBroadcast(JNIEnv* env, jclass clazz, jint fd) {
return optval;
}
static jint netty_unit_socket_msgFastopen(JNIEnv* env, jclass clazz) {
return MSG_FASTOPEN;
}
// JNI Registered Methods End
@ -982,9 +990,9 @@ static const JNINativeMethod fixed_method_table[] = {
{ "newSocketDgramFd", "(Z)I", (void *) netty_unix_socket_newSocketDgramFd },
{ "newSocketStreamFd", "(Z)I", (void *) netty_unix_socket_newSocketStreamFd },
{ "newSocketDomainFd", "()I", (void *) netty_unix_socket_newSocketDomainFd },
{ "sendTo", "(IZLjava/nio/ByteBuffer;II[BII)I", (void *) netty_unix_socket_sendTo },
{ "sendToAddress", "(IZJII[BII)I", (void *) netty_unix_socket_sendToAddress },
{ "sendToAddresses", "(IZJI[BII)I", (void *) netty_unix_socket_sendToAddresses },
{ "sendTo", "(IZLjava/nio/ByteBuffer;II[BIII)I", (void *) netty_unix_socket_sendTo },
{ "sendToAddress", "(IZJII[BIII)I", (void *) netty_unix_socket_sendToAddress },
{ "sendToAddresses", "(IZJI[BIII)I", (void *) netty_unix_socket_sendToAddresses },
// "recvFrom" has a dynamic signature
// "recvFromAddress" has a dynamic signature
{ "recvFd", "(I)I", (void *) netty_unix_socket_recvFd },
@ -1012,7 +1020,8 @@ static const JNINativeMethod fixed_method_table[] = {
{ "getSoError", "(I)I", (void *) netty_unix_socket_getSoError },
{ "initialize", "(Z)V", (void *) netty_unix_socket_initialize },
{ "isIPv6Preferred", "()Z", (void *) netty_unix_socket_isIPv6Preferred },
{ "isIPv6", "(I)Z", (void *) netty_unix_socket_isIPv6 }
{ "isIPv6", "(I)Z", (void *) netty_unix_socket_isIPv6 },
{ "msgFastopen", "()I", (void *) netty_unit_socket_msgFastopen }
};
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);

View File

@ -113,29 +113,10 @@ public class Socket extends FileDescriptor {
}
public final int sendTo(ByteBuffer buf, int pos, int limit, InetAddress addr, int port) throws IOException {
// just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected
// to be called frequently
byte[] address;
int scopeId;
if (addr instanceof Inet6Address) {
address = addr.getAddress();
scopeId = ((Inet6Address) addr).getScopeId();
} else {
// convert to ipv4 mapped ipv6 address;
scopeId = 0;
address = ipv4MappedIpv6Address(addr.getAddress());
}
int res = sendTo(fd, useIpv6(addr), buf, pos, limit, address, scopeId, port);
if (res >= 0) {
return res;
}
if (res == ERROR_ECONNREFUSED_NEGATIVE) {
throw new PortUnreachableException("sendTo failed");
}
return ioResult("sendTo", res);
return sendTo(buf, pos, limit, addr, port, false);
}
public final int sendToAddress(long memoryAddress, int pos, int limit, InetAddress addr, int port)
public final int sendTo(ByteBuffer buf, int pos, int limit, InetAddress addr, int port, boolean fastOpen)
throws IOException {
// just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected
// to be called frequently
@ -149,17 +130,30 @@ public class Socket extends FileDescriptor {
scopeId = 0;
address = ipv4MappedIpv6Address(addr.getAddress());
}
int res = sendToAddress(fd, useIpv6(addr), memoryAddress, pos, limit, address, scopeId, port);
int flags = fastOpen ? msgFastopen() : 0;
int res = sendTo(fd, useIpv6(addr), buf, pos, limit, address, scopeId, port, flags);
if (res >= 0) {
return res;
}
if (res == ERROR_ECONNREFUSED_NEGATIVE) {
throw new PortUnreachableException("sendToAddress failed");
if (res == ERRNO_EINPROGRESS_NEGATIVE && fastOpen) {
// This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
// In this case, our TCP connection will be established normally, but no data was transmitted at this time.
// We'll just transmit the data with normal writes later.
return 0;
}
return ioResult("sendToAddress", res);
if (res == ERROR_ECONNREFUSED_NEGATIVE) {
throw new PortUnreachableException("sendTo failed");
}
return ioResult("sendTo", res);
}
public final int sendToAddresses(long memoryAddress, int length, InetAddress addr, int port) throws IOException {
public final int sendToAddress(long memoryAddress, int pos, int limit, InetAddress addr, int port)
throws IOException {
return sendToAddress(memoryAddress, pos, limit, addr, port, false);
}
public final int sendToAddress(long memoryAddress, int pos, int limit, InetAddress addr, int port,
boolean fastOpen) throws IOException {
// just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected
// to be called frequently
byte[] address;
@ -172,11 +166,52 @@ public class Socket extends FileDescriptor {
scopeId = 0;
address = ipv4MappedIpv6Address(addr.getAddress());
}
int res = sendToAddresses(fd, useIpv6(addr), memoryAddress, length, address, scopeId, port);
int flags = fastOpen ? msgFastopen() : 0;
int res = sendToAddress(fd, useIpv6(addr), memoryAddress, pos, limit, address, scopeId, port, flags);
if (res >= 0) {
return res;
}
if (res == ERRNO_EINPROGRESS_NEGATIVE && fastOpen) {
// This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
// In this case, our TCP connection will be established normally, but no data was transmitted at this time.
// We'll just transmit the data with normal writes later.
return 0;
}
if (res == ERROR_ECONNREFUSED_NEGATIVE) {
throw new PortUnreachableException("sendToAddress failed");
}
return ioResult("sendToAddress", res);
}
public final int sendToAddresses(long memoryAddress, int length, InetAddress addr, int port) throws IOException {
return sendToAddresses(memoryAddress, length, addr, port, false);
}
public final int sendToAddresses(long memoryAddress, int length, InetAddress addr, int port, boolean fastOpen)
throws IOException {
// just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected
// to be called frequently
byte[] address;
int scopeId;
if (addr instanceof Inet6Address) {
address = addr.getAddress();
scopeId = ((Inet6Address) addr).getScopeId();
} else {
// convert to ipv4 mapped ipv6 address;
scopeId = 0;
address = ipv4MappedIpv6Address(addr.getAddress());
}
int flags = fastOpen ? msgFastopen() : 0;
int res = sendToAddresses(fd, useIpv6(addr), memoryAddress, length, address, scopeId, port, flags);
if (res >= 0) {
return res;
}
if (res == ERRNO_EINPROGRESS_NEGATIVE && fastOpen) {
// This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
// In this case, our TCP connection will be established normally, but no data was transmitted at this time.
// We'll just transmit the data with normal writes later.
return 0;
}
if (res == ERROR_ECONNREFUSED_NEGATIVE) {
throw new PortUnreachableException("sendToAddresses failed");
}
@ -467,11 +502,16 @@ public class Socket extends FileDescriptor {
private static native byte[] localAddress(int fd);
private static native int sendTo(
int fd, boolean ipv6, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port);
int fd, boolean ipv6, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port,
int flags);
private static native int sendToAddress(
int fd, boolean ipv6, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port);
int fd, boolean ipv6, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port,
int flags);
private static native int sendToAddresses(
int fd, boolean ipv6, long memoryAddress, int length, byte[] address, int scopeId, int port);
int fd, boolean ipv6, long memoryAddress, int length, byte[] address, int scopeId, int port,
int flags);
private static native DatagramSocketAddress recvFrom(
int fd, ByteBuffer buf, int pos, int limit) throws IOException;
@ -479,6 +519,7 @@ public class Socket extends FileDescriptor {
int fd, long memoryAddress, int pos, int limit) throws IOException;
private static native int recvFd(int fd);
private static native int sendFd(int socketFd, int fd);
private static native int msgFastopen();
private static native int newSocketStreamFd(boolean ipv6);
private static native int newSocketDgramFd(boolean ipv6);

View File

@ -24,7 +24,7 @@ import java.net.InetAddress;
import java.net.NetworkInterface;
/**
* A {@link ChannelOption} allows to configure a {@link ChannelConfig} in a type-safe
* A {@link ChannelOption} allows to configure a {@link ChannelConfig} in a type-safe
* way. Which {@link ChannelOption} is supported depends on the actual implementation
* of {@link ChannelConfig} and may depend on the nature of the transport it belongs
* to.
@ -125,6 +125,7 @@ public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {
public static final ChannelOption<Boolean> IP_MULTICAST_LOOP_DISABLED = valueOf("IP_MULTICAST_LOOP_DISABLED");
public static final ChannelOption<Boolean> TCP_NODELAY = valueOf("TCP_NODELAY");
public static final ChannelOption<Boolean> TCP_FASTOPEN_CONNECT = valueOf("TCP_FASTOPEN_CONNECT");
@Deprecated
public static final ChannelOption<Boolean> DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION =