TCP Fast Open for clients (#11006)
Support TCP Fast Open for clients and make SslHandler take advantage Motivation: - TCP Fast Open allow us to send a small amount of data along side the initial SYN packet when establishing a TCP connection. - The TLS Client Hello packet is small enough to fit in there, and is also idempotent (another requirement for using TCP Fast Open), so if we can save a round-trip when establishing TLS connections when using TFO. Modification: - Add support for client-side TCP Fast Open for Epoll, and also lowers the Linux kernel version requirements to 3.6. - When adding the SslHandler to a pipeline, if TCP Fast Open is enabled for the channel (and the channel is not already active) then start the handshake early by writing it to the outbound buffer. - An important detail to note here, is that the outbound buffer is not flushed at this point, like it would for normal handshakes. The flushing happens later as part of establishing the TCP connection. Result: - It is now possible for clients (on epoll) to open connections with TCP Fast Open. - The SslHandler automatically detects when this is the case, and now send its Client Hello message as part of the initial data in the TCP Fast Open flow when available, saving a round-trip when establishing TLS connections. Co-authored-by: Colin Godsey <crgodsey@gmail.com>
This commit is contained in:
parent
1d1087243f
commit
d60b1651fc
@ -28,6 +28,7 @@ import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.ChannelPromiseNotifier;
|
||||
@ -781,7 +782,7 @@ public class SslHandler extends ByteToMessageDecoder {
|
||||
forceFlush(ctx);
|
||||
// Explicit start handshake processing once we send the first message. This will also ensure
|
||||
// we will schedule the timeout if needed.
|
||||
startHandshakeProcessing();
|
||||
startHandshakeProcessing(true);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -1945,20 +1946,26 @@ public class SslHandler extends ByteToMessageDecoder {
|
||||
public void handlerAdded0(final ChannelHandlerContext ctx) throws Exception {
|
||||
this.ctx = ctx;
|
||||
|
||||
pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(ctx.channel(), 16);
|
||||
if (ctx.channel().isActive()) {
|
||||
startHandshakeProcessing();
|
||||
Channel channel = ctx.channel();
|
||||
pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16);
|
||||
boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
|
||||
boolean active = channel.isActive();
|
||||
if (active || fastOpen) {
|
||||
// Do not flush the handshake when TCP Fast Open is enabled, unless the channel is active.
|
||||
// With TCP Fast Open, we write to the outbound buffer before the TCP connect is established.
|
||||
// The buffer will then be flushed as part of estabilishing the connection, saving us a round-trip.
|
||||
startHandshakeProcessing(active || !fastOpen);
|
||||
}
|
||||
}
|
||||
|
||||
private void startHandshakeProcessing() {
|
||||
private void startHandshakeProcessing(boolean flushAtEnd) {
|
||||
if (!handshakeStarted) {
|
||||
handshakeStarted = true;
|
||||
if (engine.getUseClientMode()) {
|
||||
// Begin the initial handshake.
|
||||
// channelActive() event has been fired already, which means this.channelActive() will
|
||||
// not be invoked. We have to initialize here instead.
|
||||
handshake();
|
||||
handshake(flushAtEnd);
|
||||
}
|
||||
applyHandshakeTimeout();
|
||||
}
|
||||
@ -2005,15 +2012,18 @@ public class SslHandler extends ByteToMessageDecoder {
|
||||
oldHandshakePromise.addListener(new PromiseNotifier<>(newHandshakePromise));
|
||||
} else {
|
||||
handshakePromise = newHandshakePromise;
|
||||
handshake();
|
||||
handshake(true);
|
||||
applyHandshakeTimeout();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs TLS (re)negotiation.
|
||||
* @param flushAtEnd Set to {@code true} if the outbound buffer should be flushed (written to the network) at the
|
||||
* end. Set to {@code false} if the handshake will be flushed later, e.g. as part of TCP Fast Open
|
||||
* connect.
|
||||
*/
|
||||
private void handshake() {
|
||||
private void handshake(boolean flushAtEnd) {
|
||||
if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
|
||||
// Not all SSLEngine implementations support calling beginHandshake multiple times while a handshake
|
||||
// is in progress. See https://github.com/netty/netty/issues/4718.
|
||||
@ -2036,7 +2046,9 @@ public class SslHandler extends ByteToMessageDecoder {
|
||||
} catch (Throwable e) {
|
||||
setHandshakeFailure(ctx, e);
|
||||
} finally {
|
||||
forceFlush(ctx);
|
||||
if (flushAtEnd) {
|
||||
forceFlush(ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -2080,7 +2092,7 @@ public class SslHandler extends ByteToMessageDecoder {
|
||||
@Override
|
||||
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
|
||||
if (!startTls) {
|
||||
startHandshakeProcessing();
|
||||
startHandshakeProcessing(true);
|
||||
}
|
||||
ctx.fireChannelActive();
|
||||
}
|
||||
|
@ -17,19 +17,37 @@ package io.netty.testsuite.transport.socket;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerAdapter;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
import org.junit.AssumptionViolatedException;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static io.netty.buffer.ByteBufUtil.writeAscii;
|
||||
import static io.netty.buffer.UnpooledByteBufAllocator.DEFAULT;
|
||||
import static io.netty.util.CharsetUtil.US_ASCII;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class SocketConnectTest extends AbstractSocketTest {
|
||||
|
||||
@ -110,8 +128,93 @@ public class SocketConnectTest extends AbstractSocketTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testWriteWithFastOpenBeforeConnect() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testWriteWithFastOpenBeforeConnect(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||
enableTcpFastOpen(sb, cb);
|
||||
sb.childOption(ChannelOption.AUTO_READ, true);
|
||||
cb.option(ChannelOption.AUTO_READ, true);
|
||||
|
||||
sb.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
ch.pipeline().addLast(new EchoServerHandler());
|
||||
}
|
||||
});
|
||||
|
||||
Channel sc = sb.bind().sync().channel();
|
||||
connectAndVerifyDataTransfer(cb, sc);
|
||||
connectAndVerifyDataTransfer(cb, sc);
|
||||
}
|
||||
|
||||
private static void connectAndVerifyDataTransfer(Bootstrap cb, Channel sc)
|
||||
throws InterruptedException {
|
||||
BufferingClientHandler handler = new BufferingClientHandler();
|
||||
cb.handler(handler);
|
||||
ChannelFuture register = cb.register();
|
||||
Channel channel = register.sync().channel();
|
||||
ChannelFuture write = channel.write(writeAscii(DEFAULT, "[fastopen]"));
|
||||
SocketAddress remoteAddress = sc.localAddress();
|
||||
ChannelFuture connectFuture = channel.connect(remoteAddress);
|
||||
Channel cc = connectFuture.sync().channel();
|
||||
cc.writeAndFlush(writeAscii(DEFAULT, "[normal data]")).sync();
|
||||
write.sync();
|
||||
String expectedString = "[fastopen][normal data]";
|
||||
String result = handler.collectBuffer(expectedString.getBytes(US_ASCII).length);
|
||||
cc.disconnect().sync();
|
||||
assertEquals(expectedString, result);
|
||||
}
|
||||
|
||||
protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) {
|
||||
throw new AssumptionViolatedException(
|
||||
"Support for testing TCP_FASTOPEN not enabled for " + StringUtil.simpleClassName(this));
|
||||
}
|
||||
|
||||
private static void assertLocalAddress(InetSocketAddress address) {
|
||||
assertTrue(address.getPort() > 0);
|
||||
assertFalse(address.getAddress().isAnyLocalAddress());
|
||||
}
|
||||
|
||||
private static class BufferingClientHandler extends ChannelHandlerAdapter {
|
||||
private final Semaphore semaphore = new Semaphore(0);
|
||||
private final ByteArrayOutputStream streamBuffer = new ByteArrayOutputStream();
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
int readableBytes = buf.readableBytes();
|
||||
buf.readBytes(streamBuffer, readableBytes);
|
||||
semaphore.release(readableBytes);
|
||||
buf.release();
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unexpected message type: " + msg);
|
||||
}
|
||||
}
|
||||
|
||||
String collectBuffer(int expectedBytes) throws InterruptedException {
|
||||
semaphore.acquire(expectedBytes);
|
||||
String result = streamBuffer.toString(US_ASCII);
|
||||
streamBuffer.reset();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class EchoServerHandler extends ChannelHandlerAdapter {
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buffer = ctx.alloc().buffer();
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
buffer.writeBytes(buf);
|
||||
buf.release();
|
||||
ctx.channel().writeAndFlush(buffer);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unexpected message type: " + msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -61,6 +61,6 @@ public class SocketMultipleConnectTest extends AbstractSocketTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return new ArrayList<>(SocketTestPermutation.INSTANCE.socket());
|
||||
return new ArrayList<>(SocketTestPermutation.INSTANCE.socketWithFastOpen());
|
||||
}
|
||||
}
|
||||
|
@ -40,7 +40,7 @@ import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class SocketReadPendingTest extends AbstractSocketTest {
|
||||
@Test(timeout = 30000)
|
||||
@Test(timeout = 60000)
|
||||
public void testReadPendingIsResetAfterEachRead() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
@ -100,6 +100,22 @@ public class SocketTestPermutation {
|
||||
return combo(sbfs, cbfs);
|
||||
}
|
||||
|
||||
public List<BootstrapComboFactory<ServerBootstrap, Bootstrap>> socketWithFastOpen() {
|
||||
// Make the list of ServerBootstrap factories.
|
||||
List<BootstrapFactory<ServerBootstrap>> sbfs = serverSocket();
|
||||
|
||||
// Make the list of Bootstrap factories.
|
||||
List<BootstrapFactory<Bootstrap>> cbfs = clientSocketWithFastOpen();
|
||||
|
||||
// Populate the combinations
|
||||
List<BootstrapComboFactory<ServerBootstrap, Bootstrap>> list = combo(sbfs, cbfs);
|
||||
|
||||
// Remove the OIO-OIO case which often leads to a dead lock by its nature.
|
||||
list.remove(list.size() - 1);
|
||||
|
||||
return list;
|
||||
}
|
||||
|
||||
public List<BootstrapComboFactory<Bootstrap, Bootstrap>> datagram(final InternetProtocolFamily family) {
|
||||
// Make the list of Bootstrap factories.
|
||||
List<BootstrapFactory<Bootstrap>> bfs = Collections.singletonList(
|
||||
@ -133,6 +149,10 @@ public class SocketTestPermutation {
|
||||
);
|
||||
}
|
||||
|
||||
public List<BootstrapFactory<Bootstrap>> clientSocketWithFastOpen() {
|
||||
return clientSocket();
|
||||
}
|
||||
|
||||
public List<BootstrapFactory<Bootstrap>> datagramSocket() {
|
||||
return Collections.singletonList(
|
||||
() -> new Bootstrap().group(nioWorkerGroup).channel(NioDatagramChannel.class)
|
||||
|
@ -124,10 +124,6 @@ static void netty_epoll_linuxsocket_setTcpFastOpen(JNIEnv* env, jclass clazz, ji
|
||||
netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_FASTOPEN, &optval, sizeof(optval));
|
||||
}
|
||||
|
||||
static void netty_epoll_linuxsocket_setTcpFastOpenConnect(JNIEnv* env, jclass clazz, jint fd, jint optval) {
|
||||
netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_FASTOPEN_CONNECT, &optval, sizeof(optval));
|
||||
}
|
||||
|
||||
static void netty_epoll_linuxsocket_setTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd, jint optval) {
|
||||
netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
|
||||
}
|
||||
@ -596,20 +592,6 @@ static jint netty_epoll_linuxsocket_isTcpQuickAck(JNIEnv* env, jclass clazz, jin
|
||||
return optval;
|
||||
}
|
||||
|
||||
static jint netty_epoll_linuxsocket_isTcpFastOpenConnect(JNIEnv* env, jclass clazz, jint fd) {
|
||||
int optval;
|
||||
// We call netty_unix_socket_getOption0 directly so we can handle ENOPROTOOPT by ourself.
|
||||
if (netty_unix_socket_getOption0(fd, IPPROTO_TCP, TCP_FASTOPEN_CONNECT, &optval, sizeof(optval)) == -1) {
|
||||
if (errno == ENOPROTOOPT) {
|
||||
// Not supported by the system, so just return 0.
|
||||
return 0;
|
||||
}
|
||||
netty_unix_socket_getOptionHandleError(env, errno);
|
||||
return -1;
|
||||
}
|
||||
return optval;
|
||||
}
|
||||
|
||||
static jint netty_epoll_linuxsocket_getTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd) {
|
||||
int optval;
|
||||
if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &optval, sizeof(optval)) == -1) {
|
||||
@ -681,8 +663,6 @@ static const JNINativeMethod fixed_method_table[] = {
|
||||
{ "getTcpNotSentLowAt", "(I)I", (void *) netty_epoll_linuxsocket_getTcpNotSentLowAt },
|
||||
{ "isTcpQuickAck", "(I)I", (void *) netty_epoll_linuxsocket_isTcpQuickAck },
|
||||
{ "setTcpFastOpen", "(II)V", (void *) netty_epoll_linuxsocket_setTcpFastOpen },
|
||||
{ "setTcpFastOpenConnect", "(II)V", (void *) netty_epoll_linuxsocket_setTcpFastOpenConnect },
|
||||
{ "isTcpFastOpenConnect", "(I)I", (void *) netty_epoll_linuxsocket_isTcpFastOpenConnect },
|
||||
{ "setTcpKeepIdle", "(II)V", (void *) netty_epoll_linuxsocket_setTcpKeepIdle },
|
||||
{ "setTcpKeepIntvl", "(II)V", (void *) netty_epoll_linuxsocket_setTcpKeepIntvl },
|
||||
{ "setTcpKeepCnt", "(II)V", (void *) netty_epoll_linuxsocket_setTcpKeepCnt },
|
||||
|
@ -34,6 +34,7 @@ import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
|
||||
import io.netty.channel.socket.SocketChannelConfig;
|
||||
import io.netty.channel.unix.FileDescriptor;
|
||||
import io.netty.channel.unix.IovArray;
|
||||
import io.netty.channel.unix.Socket;
|
||||
import io.netty.channel.unix.UnixChannel;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
@ -370,6 +371,43 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
||||
return WRITE_STATUS_SNDBUF_FULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write bytes to the socket, with or without a remote address.
|
||||
* Used for datagram and TCP client fast open writes.
|
||||
*/
|
||||
final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boolean fastOpen)
|
||||
throws IOException {
|
||||
assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address";
|
||||
if (data.hasMemoryAddress()) {
|
||||
long memoryAddress = data.memoryAddress();
|
||||
if (remoteAddress == null) {
|
||||
return socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
|
||||
}
|
||||
return socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
|
||||
remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
|
||||
}
|
||||
|
||||
if (data.nioBufferCount() > 1) {
|
||||
IovArray array = registration().cleanIovArray();
|
||||
array.add(data, data.readerIndex(), data.readableBytes());
|
||||
int cnt = array.count();
|
||||
assert cnt != 0;
|
||||
|
||||
if (remoteAddress == null) {
|
||||
return socket.writevAddresses(array.memoryAddress(0), cnt);
|
||||
}
|
||||
return socket.sendToAddresses(array.memoryAddress(0), cnt,
|
||||
remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
|
||||
}
|
||||
|
||||
ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
|
||||
if (remoteAddress == null) {
|
||||
return socket.write(nioData, nioData.position(), nioData.limit());
|
||||
}
|
||||
return socket.sendTo(nioData, nioData.position(), nioData.limit(),
|
||||
remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
|
||||
}
|
||||
|
||||
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
|
||||
boolean readPending;
|
||||
boolean maybeMoreDataToRead;
|
||||
@ -718,7 +756,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
||||
return connected;
|
||||
}
|
||||
|
||||
private boolean doConnect0(SocketAddress remote) throws Exception {
|
||||
boolean doConnect0(SocketAddress remote) throws Exception {
|
||||
boolean success = false;
|
||||
try {
|
||||
boolean connected = socket.connect(remote);
|
||||
|
@ -33,8 +33,12 @@ public final class EpollChannelOption<T> extends UnixChannelOption<T> {
|
||||
public static final ChannelOption<Boolean> IP_TRANSPARENT = valueOf("IP_TRANSPARENT");
|
||||
public static final ChannelOption<Boolean> IP_RECVORIGDSTADDR = valueOf("IP_RECVORIGDSTADDR");
|
||||
public static final ChannelOption<Integer> TCP_FASTOPEN = valueOf(EpollChannelOption.class, "TCP_FASTOPEN");
|
||||
public static final ChannelOption<Boolean> TCP_FASTOPEN_CONNECT =
|
||||
valueOf(EpollChannelOption.class, "TCP_FASTOPEN_CONNECT");
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link ChannelOption#TCP_FASTOPEN_CONNECT} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public static final ChannelOption<Boolean> TCP_FASTOPEN_CONNECT = ChannelOption.TCP_FASTOPEN_CONNECT;
|
||||
public static final ChannelOption<Integer> TCP_DEFER_ACCEPT =
|
||||
ChannelOption.valueOf(EpollChannelOption.class, "TCP_DEFER_ACCEPT");
|
||||
public static final ChannelOption<Boolean> TCP_QUICKACK = valueOf(EpollChannelOption.class, "TCP_QUICKACK");
|
||||
|
@ -33,7 +33,6 @@ import io.netty.channel.socket.InternetProtocolFamily;
|
||||
import io.netty.channel.unix.DatagramSocketAddress;
|
||||
import io.netty.channel.unix.Errors;
|
||||
import io.netty.channel.unix.Errors.NativeIoException;
|
||||
import io.netty.channel.unix.IovArray;
|
||||
import io.netty.channel.unix.Socket;
|
||||
import io.netty.channel.unix.UnixChannelUtil;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
@ -350,7 +349,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
|
||||
private boolean doWriteMessage(Object msg) throws Exception {
|
||||
final ByteBuf data;
|
||||
InetSocketAddress remoteAddress;
|
||||
final InetSocketAddress remoteAddress;
|
||||
if (msg instanceof AddressedEnvelope) {
|
||||
@SuppressWarnings("unchecked")
|
||||
AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
|
||||
@ -367,38 +366,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
return true;
|
||||
}
|
||||
|
||||
final long writtenBytes;
|
||||
if (data.hasMemoryAddress()) {
|
||||
long memoryAddress = data.memoryAddress();
|
||||
if (remoteAddress == null) {
|
||||
writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
|
||||
} else {
|
||||
writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
|
||||
remoteAddress.getAddress(), remoteAddress.getPort());
|
||||
}
|
||||
} else if (data.nioBufferCount() > 1) {
|
||||
IovArray array = registration().cleanIovArray();
|
||||
array.add(data, data.readerIndex(), data.readableBytes());
|
||||
int cnt = array.count();
|
||||
assert cnt != 0;
|
||||
|
||||
if (remoteAddress == null) {
|
||||
writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
|
||||
} else {
|
||||
writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
|
||||
remoteAddress.getAddress(), remoteAddress.getPort());
|
||||
}
|
||||
} else {
|
||||
ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
|
||||
if (remoteAddress == null) {
|
||||
writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
|
||||
} else {
|
||||
writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
|
||||
remoteAddress.getAddress(), remoteAddress.getPort());
|
||||
}
|
||||
}
|
||||
|
||||
return writtenBytes > 0;
|
||||
return doWriteOrSendBytes(data, remoteAddress, false) > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -15,8 +15,10 @@
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.socket.ServerSocketChannel;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
@ -25,6 +27,7 @@ import io.netty.util.concurrent.GlobalEventExecutor;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
@ -112,6 +115,29 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
|
||||
return new EpollSocketChannelUnsafe();
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean doConnect0(SocketAddress remote) throws Exception {
|
||||
if (Native.IS_SUPPORTING_TCP_FASTOPEN && config.isTcpFastOpenConnect()) {
|
||||
ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
|
||||
outbound.addFlush();
|
||||
Object curr;
|
||||
if ((curr = outbound.current()) instanceof ByteBuf) {
|
||||
ByteBuf initialData = (ByteBuf) curr;
|
||||
// If no cookie is present, the write fails with EINPROGRESS and this call basically
|
||||
// becomes a normal async connect. All writes will be sent normally afterwards.
|
||||
long localFlushedAmount = doWriteOrSendBytes(
|
||||
initialData, (InetSocketAddress) remote, true);
|
||||
if (localFlushedAmount > 0) {
|
||||
// We had a cookie and our fast-open proceeded. Remove written data
|
||||
// then continue with normal TCP operation.
|
||||
outbound.removeBytes(localFlushedAmount);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return super.doConnect0(remote);
|
||||
}
|
||||
|
||||
private final class EpollSocketChannelUnsafe extends EpollStreamUnsafe {
|
||||
@Override
|
||||
protected Executor prepareToClose() {
|
||||
|
@ -37,6 +37,7 @@ import static io.netty.channel.ChannelOption.SO_SNDBUF;
|
||||
import static io.netty.channel.ChannelOption.TCP_NODELAY;
|
||||
|
||||
public final class EpollSocketChannelConfig extends EpollDuplexChannelConfig implements SocketChannelConfig {
|
||||
private volatile boolean tcpFastopen;
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
@ -58,7 +59,7 @@ public final class EpollSocketChannelConfig extends EpollDuplexChannelConfig imp
|
||||
EpollChannelOption.TCP_CORK, EpollChannelOption.TCP_NOTSENT_LOWAT,
|
||||
EpollChannelOption.TCP_KEEPCNT, EpollChannelOption.TCP_KEEPIDLE, EpollChannelOption.TCP_KEEPINTVL,
|
||||
EpollChannelOption.TCP_MD5SIG, EpollChannelOption.TCP_QUICKACK, EpollChannelOption.IP_TRANSPARENT,
|
||||
EpollChannelOption.TCP_FASTOPEN_CONNECT, EpollChannelOption.SO_BUSY_POLL);
|
||||
ChannelOption.TCP_FASTOPEN_CONNECT, EpollChannelOption.SO_BUSY_POLL);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -109,7 +110,7 @@ public final class EpollSocketChannelConfig extends EpollDuplexChannelConfig imp
|
||||
if (option == EpollChannelOption.IP_TRANSPARENT) {
|
||||
return (T) Boolean.valueOf(isIpTransparent());
|
||||
}
|
||||
if (option == EpollChannelOption.TCP_FASTOPEN_CONNECT) {
|
||||
if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
|
||||
return (T) Boolean.valueOf(isTcpFastOpenConnect());
|
||||
}
|
||||
if (option == EpollChannelOption.SO_BUSY_POLL) {
|
||||
@ -156,7 +157,7 @@ public final class EpollSocketChannelConfig extends EpollDuplexChannelConfig imp
|
||||
setTcpMd5Sig(m);
|
||||
} else if (option == EpollChannelOption.TCP_QUICKACK) {
|
||||
setTcpQuickAck((Boolean) value);
|
||||
} else if (option == EpollChannelOption.TCP_FASTOPEN_CONNECT) {
|
||||
} else if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
|
||||
setTcpFastOpenConnect((Boolean) value);
|
||||
} else if (option == EpollChannelOption.SO_BUSY_POLL) {
|
||||
setSoBusyPoll((Integer) value);
|
||||
@ -544,29 +545,21 @@ public final class EpollSocketChannelConfig extends EpollDuplexChannelConfig imp
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@code TCP_FASTOPEN_CONNECT} option on the socket. Requires Linux kernel 4.11 or later.
|
||||
* See
|
||||
* <a href="https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=19f6d3f3">this commit</a>
|
||||
* for more details.
|
||||
* Enables client TCP fast open. {@code TCP_FASTOPEN_CONNECT} normally
|
||||
* requires Linux kernel 4.11 or later, so instead we use the traditional fast open
|
||||
* client socket mechanics that work with kernel 3.6 and later. See this
|
||||
* <a href="https://lwn.net/Articles/508865/">LWN article</a> for more info.
|
||||
*/
|
||||
public EpollSocketChannelConfig setTcpFastOpenConnect(boolean fastOpenConnect) {
|
||||
try {
|
||||
((EpollSocketChannel) channel).socket.setTcpFastOpenConnect(fastOpenConnect);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
tcpFastopen = fastOpenConnect;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if {@code TCP_FASTOPEN_CONNECT} is enabled, {@code false} otherwise.
|
||||
* Returns {@code true} if TCP fast open is enabled, {@code false} otherwise.
|
||||
*/
|
||||
public boolean isTcpFastOpenConnect() {
|
||||
try {
|
||||
return ((EpollSocketChannel) channel).socket.isTcpFastOpenConnect();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
return tcpFastopen;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -179,14 +179,6 @@ final class LinuxSocket extends Socket {
|
||||
setTcpFastOpen(intValue(), tcpFastopenBacklog);
|
||||
}
|
||||
|
||||
void setTcpFastOpenConnect(boolean tcpFastOpenConnect) throws IOException {
|
||||
setTcpFastOpenConnect(intValue(), tcpFastOpenConnect ? 1 : 0);
|
||||
}
|
||||
|
||||
boolean isTcpFastOpenConnect() throws IOException {
|
||||
return isTcpFastOpenConnect(intValue()) != 0;
|
||||
}
|
||||
|
||||
void setTcpKeepIdle(int seconds) throws IOException {
|
||||
setTcpKeepIdle(intValue(), seconds);
|
||||
}
|
||||
@ -369,7 +361,6 @@ final class LinuxSocket extends Socket {
|
||||
private static native int isIpRecvOrigDestAddr(int fd) throws IOException;
|
||||
private static native void getTcpInfo(int fd, long[] array) throws IOException;
|
||||
private static native PeerCredentials getPeerCredentials(int fd) throws IOException;
|
||||
private static native int isTcpFastOpenConnect(int fd) throws IOException;
|
||||
|
||||
private static native void setTcpDeferAccept(int fd, int deferAccept) throws IOException;
|
||||
private static native void setTcpQuickAck(int fd, int quickAck) throws IOException;
|
||||
@ -377,7 +368,6 @@ final class LinuxSocket extends Socket {
|
||||
private static native void setSoBusyPoll(int fd, int loopMicros) throws IOException;
|
||||
private static native void setTcpNotSentLowAt(int fd, int tcpNotSentLowAt) throws IOException;
|
||||
private static native void setTcpFastOpen(int fd, int tcpFastopenBacklog) throws IOException;
|
||||
private static native void setTcpFastOpenConnect(int fd, int tcpFastOpenConnect) throws IOException;
|
||||
private static native void setTcpKeepIdle(int fd, int seconds) throws IOException;
|
||||
private static native void setTcpKeepIntvl(int fd, int seconds) throws IOException;
|
||||
private static native void setTcpKeepCnt(int fd, int probes) throws IOException;
|
||||
|
@ -26,7 +26,7 @@ import java.util.List;
|
||||
public class EpollCompositeBufferGatheringWriteTest extends CompositeBufferGatheringWriteTest {
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -25,6 +25,6 @@ import java.util.List;
|
||||
public class EpollSocketAutoReadTest extends SocketAutoReadTest {
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,6 @@ import java.util.List;
|
||||
public class EpollSocketChannelNotYetConnectedTest extends SocketChannelNotYetConnectedTest {
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.clientSocket();
|
||||
return EpollSocketTestPermutation.INSTANCE.clientSocketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,6 @@ import java.util.List;
|
||||
public class EpollSocketCloseForciblyTest extends SocketCloseForciblyTest {
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,6 @@ import java.util.List;
|
||||
public class EpollSocketConditionalWritabilityTest extends SocketConditionalWritabilityTest {
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.SocketConnectTest;
|
||||
|
||||
@ -26,6 +27,12 @@ public class EpollSocketConnectTest extends SocketConnectTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) {
|
||||
sb.childOption(EpollChannelOption.TCP_FASTOPEN, 5);
|
||||
cb.option(ChannelOption.TCP_FASTOPEN_CONNECT, true);
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,6 @@ import java.util.List;
|
||||
public class EpollSocketDataReadInitialStateTest extends SocketDataReadInitialStateTest {
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,6 @@ public class EpollSocketEchoTest extends SocketEchoTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,6 @@ import java.util.List;
|
||||
public class EpollSocketExceptionHandlingTest extends SocketExceptionHandlingTest {
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,6 @@ public class EpollSocketFileRegionTest extends SocketFileRegionTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,6 @@ public class EpollSocketFixedLengthEchoTest extends SocketFixedLengthEchoTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,6 @@ public class EpollSocketGatheringWriteTest extends SocketGatheringWriteTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,6 @@ import java.util.List;
|
||||
public class EpollSocketHalfClosed extends SocketHalfClosedTest {
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ public class EpollSocketMultipleConnectTest extends SocketMultipleConnectTest {
|
||||
List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> factories
|
||||
= new ArrayList<>();
|
||||
for (TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap> comboFactory
|
||||
: EpollSocketTestPermutation.INSTANCE.socket()) {
|
||||
: EpollSocketTestPermutation.INSTANCE.socketWithFastOpen()) {
|
||||
factories.add(comboFactory);
|
||||
}
|
||||
return factories;
|
||||
|
@ -26,6 +26,6 @@ public class EpollSocketObjectEchoTest extends SocketObjectEchoTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,6 @@ import java.util.List;
|
||||
public class EpollSocketReadPendingTest extends SocketReadPendingTest {
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ import static org.junit.Assert.assertTrue;
|
||||
public class EpollSocketRstTest extends SocketRstTest {
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -31,6 +31,6 @@ public class EpollSocketSslClientRenegotiateTest extends SocketSslClientRenegoti
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -36,6 +36,6 @@ public class EpollSocketSslEchoTest extends SocketSslEchoTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -31,6 +31,6 @@ public class EpollSocketSslGreetingTest extends SocketSslGreetingTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -31,6 +31,6 @@ public class EpollSocketSslSessionReuseTest extends SocketSslSessionReuseTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -31,6 +31,6 @@ public class EpollSocketStartTlsTest extends SocketStartTlsTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,6 @@ public class EpollSocketStringEchoTest extends SocketStringEchoTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.socket();
|
||||
return EpollSocketTestPermutation.INSTANCE.socketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFactory;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.MultithreadEventLoopGroup;
|
||||
@ -57,10 +58,19 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
EpollHandler.newFactory());
|
||||
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollSocketTestPermutation.class);
|
||||
// Constants describing if/how TCP Fast Open is allowed to work on Linux:
|
||||
private static final int TFO_ENABLED_CLIENT = 1;
|
||||
private static final int TFO_ENABLED_SERVER = 2;
|
||||
|
||||
@Override
|
||||
public List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> socket() {
|
||||
List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> list =
|
||||
combo(serverSocket(), clientSocketWithFastOpen());
|
||||
|
||||
return list.subList(0, list.size() - 1); // Exclude NIO x NIO test
|
||||
}
|
||||
|
||||
public List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> socketWithoutFastOpen() {
|
||||
List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> list =
|
||||
combo(serverSocket(), clientSocket());
|
||||
|
||||
@ -69,13 +79,12 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
return list;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public List<BootstrapFactory<ServerBootstrap>> serverSocket() {
|
||||
List<BootstrapFactory<ServerBootstrap>> toReturn = new ArrayList<>();
|
||||
toReturn.add(() -> new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP)
|
||||
.channel(EpollServerSocketChannel.class));
|
||||
if (isServerFastOpen()) {
|
||||
if (isFastOpen()) {
|
||||
toReturn.add(() -> {
|
||||
ServerBootstrap serverBootstrap = new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP)
|
||||
.channel(EpollServerSocketChannel.class);
|
||||
@ -89,7 +98,6 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
return toReturn;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public List<BootstrapFactory<Bootstrap>> clientSocket() {
|
||||
return Arrays.asList(
|
||||
@ -98,11 +106,24 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<BootstrapFactory<Bootstrap>> clientSocketWithFastOpen() {
|
||||
if (isFastOpen()) {
|
||||
// Keep NIO fixture last.
|
||||
return Arrays.asList(
|
||||
() -> new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class),
|
||||
() -> new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class)
|
||||
.option(ChannelOption.TCP_FASTOPEN_CONNECT, true),
|
||||
() -> new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class)
|
||||
);
|
||||
}
|
||||
return clientSocket();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram(
|
||||
final InternetProtocolFamily family) {
|
||||
// Make the list of Bootstrap factories.
|
||||
@SuppressWarnings("unchecked")
|
||||
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
|
||||
() -> new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
|
||||
@Override
|
||||
@ -136,7 +157,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
Collections.singletonList(datagramBootstrapFactory(family)));
|
||||
}
|
||||
|
||||
private BootstrapFactory<Bootstrap> datagramBootstrapFactory(final InternetProtocolFamily family) {
|
||||
private static BootstrapFactory<Bootstrap> datagramBootstrapFactory(final InternetProtocolFamily family) {
|
||||
return () -> new Bootstrap().group(EPOLL_WORKER_GROUP).channelFactory(new ChannelFactory<Channel>() {
|
||||
@Override
|
||||
public Channel newChannel(EventLoop eventLoop) {
|
||||
@ -151,10 +172,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
}
|
||||
|
||||
public List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> domainSocket() {
|
||||
|
||||
List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> list =
|
||||
combo(serverDomainSocket(), clientDomainSocket());
|
||||
return list;
|
||||
return combo(serverDomainSocket(), clientDomainSocket());
|
||||
}
|
||||
|
||||
public List<BootstrapFactory<ServerBootstrap>> serverDomainSocket() {
|
||||
@ -177,8 +195,8 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
);
|
||||
}
|
||||
|
||||
public boolean isServerFastOpen() {
|
||||
return AccessController.doPrivileged((PrivilegedAction<Integer>) () -> {
|
||||
public boolean isFastOpen() {
|
||||
int tfoEnabled = AccessController.doPrivileged((PrivilegedAction<Integer>) () -> {
|
||||
int fastopen = 0;
|
||||
File file = new File("/proc/sys/net/ipv4/tcp_fastopen");
|
||||
if (file.exists()) {
|
||||
@ -186,9 +204,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
try {
|
||||
in = new BufferedReader(new FileReader(file));
|
||||
fastopen = Integer.parseInt(in.readLine());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("{}: {}", file, fastopen);
|
||||
}
|
||||
logger.debug("{}: {}", file, fastopen);
|
||||
} catch (Exception e) {
|
||||
logger.debug("Failed to get TCP_FASTOPEN from: {}", file, e);
|
||||
} finally {
|
||||
@ -201,12 +217,12 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("{}: {} (non-existent)", file, fastopen);
|
||||
}
|
||||
logger.debug("{}: {} (non-existent)", file, fastopen);
|
||||
}
|
||||
return fastopen;
|
||||
}) == 3;
|
||||
});
|
||||
// TCP Fast Open needs to be enabled for both clients and servers, before we can test our intergration with it.
|
||||
return tfoEnabled == TFO_ENABLED_CLIENT + TFO_ENABLED_SERVER;
|
||||
}
|
||||
|
||||
public static DomainSocketAddress newSocketAddress() {
|
||||
|
@ -25,6 +25,6 @@ public class EpollWriteBeforeRegisteredTest extends WriteBeforeRegisteredTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.clientSocket();
|
||||
return EpollSocketTestPermutation.INSTANCE.clientSocketWithFastOpen();
|
||||
}
|
||||
}
|
||||
|
@ -39,6 +39,11 @@
|
||||
#define SO_REUSEPORT 15
|
||||
#endif /* SO_REUSEPORT */
|
||||
|
||||
// MSG_FASTOPEN is defined in linux 3.6. We define this here so older kernels can compile.
|
||||
#ifndef MSG_FASTOPEN
|
||||
#define MSG_FASTOPEN 0x20000000
|
||||
#endif
|
||||
|
||||
static jclass datagramSocketAddressClass = NULL;
|
||||
static jmethodID datagramSocketAddrMethodId = NULL;
|
||||
static jmethodID inetSocketAddrMethodId = NULL;
|
||||
@ -309,7 +314,7 @@ int netty_unix_socket_initSockaddr(JNIEnv* env, jboolean ipv6, jbyteArray addres
|
||||
return 0;
|
||||
}
|
||||
|
||||
static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) {
|
||||
static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port, jint flags) {
|
||||
struct sockaddr_storage addr;
|
||||
socklen_t addrSize;
|
||||
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) {
|
||||
@ -319,7 +324,7 @@ static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos,
|
||||
ssize_t res;
|
||||
int err;
|
||||
do {
|
||||
res = sendto(fd, buffer + pos, (size_t) (limit - pos), 0, (struct sockaddr*) &addr, addrSize);
|
||||
res = sendto(fd, buffer + pos, (size_t) (limit - pos), flags, (struct sockaddr*) &addr, addrSize);
|
||||
// keep on writing if it was interrupted
|
||||
} while (res == -1 && ((err = errno) == EINTR));
|
||||
|
||||
@ -623,16 +628,16 @@ static jint netty_unix_socket_newSocketDomainFd(JNIEnv* env, jclass clazz) {
|
||||
return fd;
|
||||
}
|
||||
|
||||
static jint netty_unix_socket_sendTo(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) {
|
||||
static jint netty_unix_socket_sendTo(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port, jint flags) {
|
||||
// We check that GetDirectBufferAddress will not return NULL in OnLoad
|
||||
return _sendTo(env, fd, ipv6, (*env)->GetDirectBufferAddress(env, jbuffer), pos, limit, address, scopeId, port);
|
||||
return _sendTo(env, fd, ipv6, (*env)->GetDirectBufferAddress(env, jbuffer), pos, limit, address, scopeId, port, flags);
|
||||
}
|
||||
|
||||
static jint netty_unix_socket_sendToAddress(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) {
|
||||
return _sendTo(env, fd, ipv6, (void *) (intptr_t) memoryAddress, pos, limit, address, scopeId, port);
|
||||
static jint netty_unix_socket_sendToAddress(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port, jint flags) {
|
||||
return _sendTo(env, fd, ipv6, (void *) (intptr_t) memoryAddress, pos, limit, address, scopeId, port, flags);
|
||||
}
|
||||
|
||||
static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port) {
|
||||
static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port, jint flags) {
|
||||
struct sockaddr_storage addr;
|
||||
socklen_t addrSize;
|
||||
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) {
|
||||
@ -648,7 +653,7 @@ static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd
|
||||
ssize_t res;
|
||||
int err;
|
||||
do {
|
||||
res = sendmsg(fd, &m, 0);
|
||||
res = sendmsg(fd, &m, flags);
|
||||
// keep on writing if it was interrupted
|
||||
} while (res == -1 && ((err = errno) == EINTR));
|
||||
|
||||
@ -965,6 +970,9 @@ static jint netty_unix_socket_isBroadcast(JNIEnv* env, jclass clazz, jint fd) {
|
||||
return optval;
|
||||
}
|
||||
|
||||
static jint netty_unit_socket_msgFastopen(JNIEnv* env, jclass clazz) {
|
||||
return MSG_FASTOPEN;
|
||||
}
|
||||
|
||||
// JNI Registered Methods End
|
||||
|
||||
@ -982,9 +990,9 @@ static const JNINativeMethod fixed_method_table[] = {
|
||||
{ "newSocketDgramFd", "(Z)I", (void *) netty_unix_socket_newSocketDgramFd },
|
||||
{ "newSocketStreamFd", "(Z)I", (void *) netty_unix_socket_newSocketStreamFd },
|
||||
{ "newSocketDomainFd", "()I", (void *) netty_unix_socket_newSocketDomainFd },
|
||||
{ "sendTo", "(IZLjava/nio/ByteBuffer;II[BII)I", (void *) netty_unix_socket_sendTo },
|
||||
{ "sendToAddress", "(IZJII[BII)I", (void *) netty_unix_socket_sendToAddress },
|
||||
{ "sendToAddresses", "(IZJI[BII)I", (void *) netty_unix_socket_sendToAddresses },
|
||||
{ "sendTo", "(IZLjava/nio/ByteBuffer;II[BIII)I", (void *) netty_unix_socket_sendTo },
|
||||
{ "sendToAddress", "(IZJII[BIII)I", (void *) netty_unix_socket_sendToAddress },
|
||||
{ "sendToAddresses", "(IZJI[BIII)I", (void *) netty_unix_socket_sendToAddresses },
|
||||
// "recvFrom" has a dynamic signature
|
||||
// "recvFromAddress" has a dynamic signature
|
||||
{ "recvFd", "(I)I", (void *) netty_unix_socket_recvFd },
|
||||
@ -1012,7 +1020,8 @@ static const JNINativeMethod fixed_method_table[] = {
|
||||
{ "getSoError", "(I)I", (void *) netty_unix_socket_getSoError },
|
||||
{ "initialize", "(Z)V", (void *) netty_unix_socket_initialize },
|
||||
{ "isIPv6Preferred", "()Z", (void *) netty_unix_socket_isIPv6Preferred },
|
||||
{ "isIPv6", "(I)Z", (void *) netty_unix_socket_isIPv6 }
|
||||
{ "isIPv6", "(I)Z", (void *) netty_unix_socket_isIPv6 },
|
||||
{ "msgFastopen", "()I", (void *) netty_unit_socket_msgFastopen }
|
||||
};
|
||||
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);
|
||||
|
||||
|
@ -113,29 +113,10 @@ public class Socket extends FileDescriptor {
|
||||
}
|
||||
|
||||
public final int sendTo(ByteBuffer buf, int pos, int limit, InetAddress addr, int port) throws IOException {
|
||||
// just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected
|
||||
// to be called frequently
|
||||
byte[] address;
|
||||
int scopeId;
|
||||
if (addr instanceof Inet6Address) {
|
||||
address = addr.getAddress();
|
||||
scopeId = ((Inet6Address) addr).getScopeId();
|
||||
} else {
|
||||
// convert to ipv4 mapped ipv6 address;
|
||||
scopeId = 0;
|
||||
address = ipv4MappedIpv6Address(addr.getAddress());
|
||||
}
|
||||
int res = sendTo(fd, useIpv6(addr), buf, pos, limit, address, scopeId, port);
|
||||
if (res >= 0) {
|
||||
return res;
|
||||
}
|
||||
if (res == ERROR_ECONNREFUSED_NEGATIVE) {
|
||||
throw new PortUnreachableException("sendTo failed");
|
||||
}
|
||||
return ioResult("sendTo", res);
|
||||
return sendTo(buf, pos, limit, addr, port, false);
|
||||
}
|
||||
|
||||
public final int sendToAddress(long memoryAddress, int pos, int limit, InetAddress addr, int port)
|
||||
public final int sendTo(ByteBuffer buf, int pos, int limit, InetAddress addr, int port, boolean fastOpen)
|
||||
throws IOException {
|
||||
// just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected
|
||||
// to be called frequently
|
||||
@ -149,17 +130,30 @@ public class Socket extends FileDescriptor {
|
||||
scopeId = 0;
|
||||
address = ipv4MappedIpv6Address(addr.getAddress());
|
||||
}
|
||||
int res = sendToAddress(fd, useIpv6(addr), memoryAddress, pos, limit, address, scopeId, port);
|
||||
int flags = fastOpen ? msgFastopen() : 0;
|
||||
int res = sendTo(fd, useIpv6(addr), buf, pos, limit, address, scopeId, port, flags);
|
||||
if (res >= 0) {
|
||||
return res;
|
||||
}
|
||||
if (res == ERROR_ECONNREFUSED_NEGATIVE) {
|
||||
throw new PortUnreachableException("sendToAddress failed");
|
||||
if (res == ERRNO_EINPROGRESS_NEGATIVE && fastOpen) {
|
||||
// This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
|
||||
// In this case, our TCP connection will be established normally, but no data was transmitted at this time.
|
||||
// We'll just transmit the data with normal writes later.
|
||||
return 0;
|
||||
}
|
||||
return ioResult("sendToAddress", res);
|
||||
if (res == ERROR_ECONNREFUSED_NEGATIVE) {
|
||||
throw new PortUnreachableException("sendTo failed");
|
||||
}
|
||||
return ioResult("sendTo", res);
|
||||
}
|
||||
|
||||
public final int sendToAddresses(long memoryAddress, int length, InetAddress addr, int port) throws IOException {
|
||||
public final int sendToAddress(long memoryAddress, int pos, int limit, InetAddress addr, int port)
|
||||
throws IOException {
|
||||
return sendToAddress(memoryAddress, pos, limit, addr, port, false);
|
||||
}
|
||||
|
||||
public final int sendToAddress(long memoryAddress, int pos, int limit, InetAddress addr, int port,
|
||||
boolean fastOpen) throws IOException {
|
||||
// just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected
|
||||
// to be called frequently
|
||||
byte[] address;
|
||||
@ -172,11 +166,52 @@ public class Socket extends FileDescriptor {
|
||||
scopeId = 0;
|
||||
address = ipv4MappedIpv6Address(addr.getAddress());
|
||||
}
|
||||
int res = sendToAddresses(fd, useIpv6(addr), memoryAddress, length, address, scopeId, port);
|
||||
int flags = fastOpen ? msgFastopen() : 0;
|
||||
int res = sendToAddress(fd, useIpv6(addr), memoryAddress, pos, limit, address, scopeId, port, flags);
|
||||
if (res >= 0) {
|
||||
return res;
|
||||
}
|
||||
if (res == ERRNO_EINPROGRESS_NEGATIVE && fastOpen) {
|
||||
// This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
|
||||
// In this case, our TCP connection will be established normally, but no data was transmitted at this time.
|
||||
// We'll just transmit the data with normal writes later.
|
||||
return 0;
|
||||
}
|
||||
if (res == ERROR_ECONNREFUSED_NEGATIVE) {
|
||||
throw new PortUnreachableException("sendToAddress failed");
|
||||
}
|
||||
return ioResult("sendToAddress", res);
|
||||
}
|
||||
|
||||
public final int sendToAddresses(long memoryAddress, int length, InetAddress addr, int port) throws IOException {
|
||||
return sendToAddresses(memoryAddress, length, addr, port, false);
|
||||
}
|
||||
|
||||
public final int sendToAddresses(long memoryAddress, int length, InetAddress addr, int port, boolean fastOpen)
|
||||
throws IOException {
|
||||
// just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected
|
||||
// to be called frequently
|
||||
byte[] address;
|
||||
int scopeId;
|
||||
if (addr instanceof Inet6Address) {
|
||||
address = addr.getAddress();
|
||||
scopeId = ((Inet6Address) addr).getScopeId();
|
||||
} else {
|
||||
// convert to ipv4 mapped ipv6 address;
|
||||
scopeId = 0;
|
||||
address = ipv4MappedIpv6Address(addr.getAddress());
|
||||
}
|
||||
int flags = fastOpen ? msgFastopen() : 0;
|
||||
int res = sendToAddresses(fd, useIpv6(addr), memoryAddress, length, address, scopeId, port, flags);
|
||||
if (res >= 0) {
|
||||
return res;
|
||||
}
|
||||
if (res == ERRNO_EINPROGRESS_NEGATIVE && fastOpen) {
|
||||
// This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
|
||||
// In this case, our TCP connection will be established normally, but no data was transmitted at this time.
|
||||
// We'll just transmit the data with normal writes later.
|
||||
return 0;
|
||||
}
|
||||
if (res == ERROR_ECONNREFUSED_NEGATIVE) {
|
||||
throw new PortUnreachableException("sendToAddresses failed");
|
||||
}
|
||||
@ -467,11 +502,16 @@ public class Socket extends FileDescriptor {
|
||||
private static native byte[] localAddress(int fd);
|
||||
|
||||
private static native int sendTo(
|
||||
int fd, boolean ipv6, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port);
|
||||
int fd, boolean ipv6, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port,
|
||||
int flags);
|
||||
|
||||
private static native int sendToAddress(
|
||||
int fd, boolean ipv6, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port);
|
||||
int fd, boolean ipv6, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port,
|
||||
int flags);
|
||||
|
||||
private static native int sendToAddresses(
|
||||
int fd, boolean ipv6, long memoryAddress, int length, byte[] address, int scopeId, int port);
|
||||
int fd, boolean ipv6, long memoryAddress, int length, byte[] address, int scopeId, int port,
|
||||
int flags);
|
||||
|
||||
private static native DatagramSocketAddress recvFrom(
|
||||
int fd, ByteBuffer buf, int pos, int limit) throws IOException;
|
||||
@ -479,6 +519,7 @@ public class Socket extends FileDescriptor {
|
||||
int fd, long memoryAddress, int pos, int limit) throws IOException;
|
||||
private static native int recvFd(int fd);
|
||||
private static native int sendFd(int socketFd, int fd);
|
||||
private static native int msgFastopen();
|
||||
|
||||
private static native int newSocketStreamFd(boolean ipv6);
|
||||
private static native int newSocketDgramFd(boolean ipv6);
|
||||
|
@ -25,7 +25,7 @@ import java.net.InetAddress;
|
||||
import java.net.NetworkInterface;
|
||||
|
||||
/**
|
||||
* A {@link ChannelOption} allows to configure a {@link ChannelConfig} in a type-safe
|
||||
* A {@link ChannelOption} allows to configure a {@link ChannelConfig} in a type-safe
|
||||
* way. Which {@link ChannelOption} is supported depends on the actual implementation
|
||||
* of {@link ChannelConfig} and may depend on the nature of the transport it belongs
|
||||
* to.
|
||||
@ -126,6 +126,7 @@ public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {
|
||||
public static final ChannelOption<Boolean> IP_MULTICAST_LOOP_DISABLED = valueOf("IP_MULTICAST_LOOP_DISABLED");
|
||||
|
||||
public static final ChannelOption<Boolean> TCP_NODELAY = valueOf("TCP_NODELAY");
|
||||
public static final ChannelOption<Boolean> TCP_FASTOPEN_CONNECT = valueOf("TCP_FASTOPEN_CONNECT");
|
||||
|
||||
@Deprecated
|
||||
public static final ChannelOption<Boolean> DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION =
|
||||
|
Loading…
Reference in New Issue
Block a user