Add support for client-side TCP FastOpen to KQueue MacOS (#11560)

Motivation:
The MacOS-specific `connectx(2)` system call make it possible to establish client-side connections with TCP FastOpen.

Modification:
Add support for TCP FastOpen to the KQueue transport, and add the `connectx(2)` system call to `BsdSocket`.

Result:
It's now possible to use TCP FastOpen when initiating connections on MacOS.
This commit is contained in:
Chris Vest 2021-08-12 13:38:46 +02:00
parent bcdc07fe13
commit 25699e44e9
14 changed files with 300 additions and 28 deletions

View File

@ -30,11 +30,9 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import io.netty.util.internal.StringUtil;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Timeout;
import org.opentest4j.TestAbortedException;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -177,8 +175,9 @@ public class SocketConnectTest extends AbstractSocketTest {
} }
protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) { protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) {
throw new TestAbortedException( // TFO is an almost-pure optimisation and should not change any observable behaviour in our tests.
"Support for testing TCP_FASTOPEN not enabled for " + StringUtil.simpleClassName(this)); sb.option(ChannelOption.TCP_FASTOPEN, 5);
cb.option(ChannelOption.TCP_FASTOPEN_CONNECT, true);
} }
private static void assertLocalAddress(InetSocketAddress address) { private static void assertLocalAddress(InetSocketAddress address) {

View File

@ -29,10 +29,4 @@ public class EpollSocketConnectTest extends SocketConnectTest {
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() { protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
} }
@Override
protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) {
sb.option(ChannelOption.TCP_FASTOPEN, 5);
cb.option(ChannelOption.TCP_FASTOPEN_CONNECT, true);
}
} }

View File

@ -13,6 +13,7 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
#include <assert.h>
#include <stdlib.h> #include <stdlib.h>
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
@ -83,6 +84,60 @@ static jlong netty_kqueue_bsdsocket_sendFile(JNIEnv* env, jclass clazz, jint soc
return res < 0 ? -err : 0; return res < 0 ? -err : 0;
} }
static jint netty_kqueue_bsdsocket_connectx(JNIEnv* env, jclass clazz,
jint socketFd,
jint socketInterface,
jboolean sourceIPv6, jbyteArray sourceAddress, jint sourceScopeId, jint sourcePort,
jboolean destinationIPv6, jbyteArray destinationAddress, jint destinationScopeId, jint destinationPort,
jint flags,
jlong iovAddress, jint iovCount, jint iovDataLength) {
#ifdef __APPLE__ // connectx(2) is only defined on Darwin.
sa_endpoints_t endpoints;
endpoints.sae_srcif = (unsigned int) socketInterface;
endpoints.sae_srcaddr = NULL;
endpoints.sae_srcaddrlen = 0;
endpoints.sae_dstaddr = NULL;
endpoints.sae_dstaddrlen = 0;
struct sockaddr_storage srcaddr;
socklen_t srcaddrlen;
struct sockaddr_storage dstaddr;
socklen_t dstaddrlen;
if (NULL != sourceAddress) {
if (-1 == netty_unix_socket_initSockaddr(env,
sourceIPv6, sourceAddress, sourceScopeId, sourcePort, &srcaddr, &srcaddrlen)) {
netty_unix_errors_throwIOException(env,
"Source address specified, but could not be converted to sockaddr.");
return -EINVAL;
}
endpoints.sae_srcaddr = (const struct sockaddr*) &srcaddr;
endpoints.sae_srcaddrlen = srcaddrlen;
}
assert(destinationAddress != NULL); // Java side will ensure destination is never null.
if (-1 == netty_unix_socket_initSockaddr(env,
destinationIPv6, destinationAddress, destinationScopeId, destinationPort, &dstaddr, &dstaddrlen)) {
netty_unix_errors_throwIOException(env, "Destination address could not be converted to sockaddr.");
return -EINVAL;
}
endpoints.sae_dstaddr = (const struct sockaddr*) &dstaddr;
endpoints.sae_dstaddrlen = dstaddrlen;
int socket = (int) socketFd;
const struct iovec* iov = (const struct iovec*) iovAddress;
unsigned int iovcnt = (unsigned int) iovCount;
size_t len = (size_t) iovDataLength;
int result = connectx(socket, &endpoints, SAE_ASSOCID_ANY, flags, iov, iovcnt, &len, NULL);
if (result == -1) {
return -errno;
}
return (jint) len;
#else
return -ENOSYS;
#endif
}
static void netty_kqueue_bsdsocket_setAcceptFilter(JNIEnv* env, jclass clazz, jint fd, jstring afName, jstring afArg) { static void netty_kqueue_bsdsocket_setAcceptFilter(JNIEnv* env, jclass clazz, jint fd, jstring afName, jstring afArg) {
#ifdef SO_ACCEPTFILTER #ifdef SO_ACCEPTFILTER
struct accept_filter_arg af; struct accept_filter_arg af;
@ -196,7 +251,8 @@ static const JNINativeMethod fixed_method_table[] = {
{ "setSndLowAt", "(II)V", (void *) netty_kqueue_bsdsocket_setSndLowAt }, { "setSndLowAt", "(II)V", (void *) netty_kqueue_bsdsocket_setSndLowAt },
{ "getAcceptFilter", "(I)[Ljava/lang/String;", (void *) netty_kqueue_bsdsocket_getAcceptFilter }, { "getAcceptFilter", "(I)[Ljava/lang/String;", (void *) netty_kqueue_bsdsocket_getAcceptFilter },
{ "getTcpNoPush", "(I)I", (void *) netty_kqueue_bsdsocket_getTcpNoPush }, { "getTcpNoPush", "(I)I", (void *) netty_kqueue_bsdsocket_getTcpNoPush },
{ "getSndLowAt", "(I)I", (void *) netty_kqueue_bsdsocket_getSndLowAt } { "getSndLowAt", "(I)I", (void *) netty_kqueue_bsdsocket_getSndLowAt },
{ "connectx", "(IIZ[BIIZ[BIIIJII)I", (void *) netty_kqueue_bsdsocket_connectx }
}; };
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);

View File

@ -60,6 +60,12 @@
#ifndef NOTE_DISCONNECTED #ifndef NOTE_DISCONNECTED
#define NOTE_DISCONNECTED 0x00001000 #define NOTE_DISCONNECTED 0x00001000
#endif /* NOTE_DISCONNECTED */ #endif /* NOTE_DISCONNECTED */
#ifndef CONNECT_RESUME_ON_READ_WRITE
#define CONNECT_RESUME_ON_READ_WRITE 0x1
#endif /* CONNECT_RESUME_ON_READ_WRITE */
#ifndef CONNECT_DATA_IDEMPOTENT
#define CONNECT_DATA_IDEMPOTENT 0x2
#endif /* CONNECT_DATA_IDEMPOTENT */
#else #else
#ifndef EVFILT_SOCK #ifndef EVFILT_SOCK
#define EVFILT_SOCK 0 // Disabled #define EVFILT_SOCK 0 // Disabled
@ -73,6 +79,12 @@
#ifndef NOTE_DISCONNECTED #ifndef NOTE_DISCONNECTED
#define NOTE_DISCONNECTED 0 #define NOTE_DISCONNECTED 0
#endif /* NOTE_DISCONNECTED */ #endif /* NOTE_DISCONNECTED */
#ifndef CONNECT_RESUME_ON_READ_WRITE
#define CONNECT_RESUME_ON_READ_WRITE 0
#endif /* CONNECT_RESUME_ON_READ_WRITE */
#ifndef CONNECT_DATA_IDEMPOTENT
#define CONNECT_DATA_IDEMPOTENT 0
#endif /* CONNECT_DATA_IDEMPOTENT */
#endif /* __APPLE__ */ #endif /* __APPLE__ */
static clockid_t waitClockId = 0; // initialized by netty_unix_util_initialize_wait_clock static clockid_t waitClockId = 0; // initialized by netty_unix_util_initialize_wait_clock
@ -247,6 +259,14 @@ static jshort netty_kqueue_native_noteDisconnected(JNIEnv* env, jclass clazz) {
return NOTE_DISCONNECTED; return NOTE_DISCONNECTED;
} }
static jint netty_kqueue_bsdsocket_connectResumeOnReadWrite(JNIEnv *env) {
return CONNECT_RESUME_ON_READ_WRITE;
}
static jint netty_kqueue_bsdsocket_connectDataIdempotent(JNIEnv *env) {
return CONNECT_DATA_IDEMPOTENT;
}
// JNI Method Registration Table Begin // JNI Method Registration Table Begin
static const JNINativeMethod statically_referenced_fixed_method_table[] = { static const JNINativeMethod statically_referenced_fixed_method_table[] = {
{ "evfiltRead", "()S", (void *) netty_kqueue_native_evfiltRead }, { "evfiltRead", "()S", (void *) netty_kqueue_native_evfiltRead },
@ -262,7 +282,9 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = {
{ "evError", "()S", (void *) netty_kqueue_native_evError }, { "evError", "()S", (void *) netty_kqueue_native_evError },
{ "noteReadClosed", "()S", (void *) netty_kqueue_native_noteReadClosed }, { "noteReadClosed", "()S", (void *) netty_kqueue_native_noteReadClosed },
{ "noteConnReset", "()S", (void *) netty_kqueue_native_noteConnReset }, { "noteConnReset", "()S", (void *) netty_kqueue_native_noteConnReset },
{ "noteDisconnected", "()S", (void *) netty_kqueue_native_noteDisconnected } { "noteDisconnected", "()S", (void *) netty_kqueue_native_noteDisconnected },
{ "connectResumeOnReadWrite", "()I", (void *) netty_kqueue_bsdsocket_connectResumeOnReadWrite },
{ "connectDataIdempotent", "()I", (void *) netty_kqueue_bsdsocket_connectDataIdempotent }
}; };
static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]); static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]);
static const JNINativeMethod fixed_method_table[] = { static const JNINativeMethod fixed_method_table[] = {

View File

@ -387,7 +387,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
final void readReadyFinally(ChannelConfig config) { final void readReadyFinally(ChannelConfig config) {
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
if (allocHandle.isReadEOF() || (readPending && maybeMoreDataToRead)) { if (allocHandle.isReadEOF() || readPending && maybeMoreDataToRead) {
// trigger a read again as there may be something left to read and because of ET we // trigger a read again as there may be something left to read and because of ET we
// will not get notified again until we read everything from the socket // will not get notified again until we read everything from the socket
// //
@ -691,7 +691,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
socket.bind(localAddress); socket.bind(localAddress);
} }
boolean connected = doConnect0(remoteAddress); boolean connected = doConnect0(remoteAddress, localAddress);
if (connected) { if (connected) {
remote = remoteSocketAddr == null? remote = remoteSocketAddr == null?
remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress()); remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
@ -703,10 +703,10 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
return connected; return connected;
} }
private boolean doConnect0(SocketAddress remote) throws Exception { protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
boolean success = false; boolean success = false;
try { try {
boolean connected = socket.connect(remote); boolean connected = socket.connect(remoteAddress);
if (!connected) { if (!connected) {
writeFilter(true); writeFilter(true);
} }

View File

@ -16,13 +16,21 @@
package io.netty.channel.kqueue; package io.netty.channel.kqueue;
import io.netty.channel.DefaultFileRegion; import io.netty.channel.DefaultFileRegion;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.PeerCredentials; import io.netty.channel.unix.PeerCredentials;
import io.netty.channel.unix.Socket; import io.netty.channel.unix.Socket;
import java.io.IOException; import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import static io.netty.channel.kqueue.AcceptFilter.PLATFORM_UNSUPPORTED; import static io.netty.channel.kqueue.AcceptFilter.PLATFORM_UNSUPPORTED;
import static io.netty.channel.kqueue.Native.CONNECT_TCP_FASTOPEN;
import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
import static io.netty.channel.unix.Errors.ioResult; import static io.netty.channel.unix.Errors.ioResult;
import static io.netty.channel.unix.NativeInetAddress.ipv4MappedIpv6Address;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/** /**
* A socket which provides access BSD native methods. * A socket which provides access BSD native methods.
@ -34,6 +42,12 @@ final class BsdSocket extends Socket {
private static final int APPLE_SND_LOW_AT_MAX = 1 << 17; private static final int APPLE_SND_LOW_AT_MAX = 1 << 17;
private static final int FREEBSD_SND_LOW_AT_MAX = 1 << 15; private static final int FREEBSD_SND_LOW_AT_MAX = 1 << 15;
static final int BSD_SND_LOW_AT_MAX = Math.min(APPLE_SND_LOW_AT_MAX, FREEBSD_SND_LOW_AT_MAX); static final int BSD_SND_LOW_AT_MAX = Math.min(APPLE_SND_LOW_AT_MAX, FREEBSD_SND_LOW_AT_MAX);
/**
* The `endpoints` structure passed to `connectx(2)` has an optional "source interface" field,
* which is the index of the network interface to use.
* According to `if_nametoindex(3)`, the value 0 is used when no interface is specified.
*/
private static final int UNSPECIFIED_SOURCE_INTERFACE = 0;
BsdSocket(int fd) { BsdSocket(int fd) {
super(fd); super(fd);
@ -51,7 +65,7 @@ final class BsdSocket extends Socket {
setSndLowAt(intValue(), lowAt); setSndLowAt(intValue(), lowAt);
} }
boolean isTcpNoPush() throws IOException { boolean isTcpNoPush() throws IOException {
return getTcpNoPush(intValue()) != 0; return getTcpNoPush(intValue()) != 0;
} }
@ -80,6 +94,96 @@ final class BsdSocket extends Socket {
return ioResult("sendfile", (int) res); return ioResult("sendfile", (int) res);
} }
/**
* Establish a connection to the given destination address, and send the given data to it.
*
* <strong>Note:</strong> This method relies on the {@code connectx(2)} system call, which is MacOS specific.
*
* @param source the source address we are connecting from.
* @param destination the destination address we are connecting to.
* @param data the data to copy to the kernel-side socket buffer.
* @param tcpFastOpen if {@code true}, set the flags needed to enable TCP FastOpen connecting.
* @return The number of bytes copied to the kernel-side socket buffer, or the number of bytes sent to the
* destination. This number is <em>negative</em> if connecting is left in an in-progress state,
* or <em>positive</em> if the connection was immediately established.
* @throws IOException if an IO error occurs, if the {@code data} is too big to send in one go,
* or if the system call is not supported on your platform.
*/
int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray data, boolean tcpFastOpen)
throws IOException {
checkNotNull(destination, "Destination InetSocketAddress cannot be null.");
int flags = tcpFastOpen ? CONNECT_TCP_FASTOPEN : 0;
boolean sourceIPv6;
byte[] sourceAddress;
int sourceScopeId;
int sourcePort;
if (source == null) {
sourceIPv6 = false;
sourceAddress = null;
sourceScopeId = 0;
sourcePort = 0;
} else {
InetAddress sourceInetAddress = source.getAddress();
sourceIPv6 = sourceInetAddress instanceof Inet6Address;
if (sourceIPv6) {
sourceAddress = sourceInetAddress.getAddress();
sourceScopeId = ((Inet6Address) sourceInetAddress).getScopeId();
} else {
// convert to ipv4 mapped ipv6 address;
sourceScopeId = 0;
sourceAddress = ipv4MappedIpv6Address(sourceInetAddress.getAddress());
}
sourcePort = source.getPort();
}
InetAddress destinationInetAddress = destination.getAddress();
boolean destinationIPv6 = destinationInetAddress instanceof Inet6Address;
byte[] destinationAddress;
int destinationScopeId;
if (destinationIPv6) {
destinationAddress = destinationInetAddress.getAddress();
destinationScopeId = ((Inet6Address) destinationInetAddress).getScopeId();
} else {
// convert to ipv4 mapped ipv6 address;
destinationScopeId = 0;
destinationAddress = ipv4MappedIpv6Address(destinationInetAddress.getAddress());
}
int destinationPort = destination.getPort();
long iovAddress;
int iovCount;
int iovDataLength;
if (data == null || data.count() == 0) {
iovAddress = 0;
iovCount = 0;
iovDataLength = 0;
} else {
iovAddress = data.memoryAddress(0);
iovCount = data.count();
long size = data.size();
if (size > Integer.MAX_VALUE) {
throw new IOException("IovArray.size() too big: " + size + " bytes.");
}
iovDataLength = (int) size;
}
int result = connectx(intValue(),
UNSPECIFIED_SOURCE_INTERFACE, sourceIPv6, sourceAddress, sourceScopeId, sourcePort,
destinationIPv6, destinationAddress, destinationScopeId, destinationPort,
flags, iovAddress, iovCount, iovDataLength);
if (result == ERRNO_EINPROGRESS_NEGATIVE) {
// This is normal for non-blocking sockets.
// We'll know the connection has been established when the socket is selectable for writing.
// Tell the channel the data was written, so the outbound buffer can update its position.
return -iovDataLength;
}
if (result < 0) {
return ioResult("connectx", result);
}
return result;
}
public static BsdSocket newSocketStream() { public static BsdSocket newSocketStream() {
return new BsdSocket(newSocketStream0()); return new BsdSocket(newSocketStream0());
} }
@ -99,12 +203,32 @@ final class BsdSocket extends Socket {
private static native long sendFile(int socketFd, DefaultFileRegion src, long baseOffset, private static native long sendFile(int socketFd, DefaultFileRegion src, long baseOffset,
long offset, long length) throws IOException; long offset, long length) throws IOException;
/**
* @return If successful, zero or positive number of bytes transfered, otherwise negative errno.
*/
private static native int connectx(
int socketFd,
// sa_endpoints_t *endpoints:
int sourceInterface,
boolean sourceIPv6, byte[] sourceAddress, int sourceScopeId, int sourcePort,
boolean destinationIPv6, byte[] destinationAddress, int destinationScopeId, int destinationPort,
// sae_associd_t associd is reserved
int flags,
long iovAddress, int iovCount, int iovDataLength
// sae_connid_t *connid is reserved
);
private static native String[] getAcceptFilter(int fd) throws IOException; private static native String[] getAcceptFilter(int fd) throws IOException;
private static native int getTcpNoPush(int fd) throws IOException; private static native int getTcpNoPush(int fd) throws IOException;
private static native int getSndLowAt(int fd) throws IOException; private static native int getSndLowAt(int fd) throws IOException;
private static native PeerCredentials getPeerCredentials(int fd) throws IOException; private static native PeerCredentials getPeerCredentials(int fd) throws IOException;
private static native void setAcceptFilter(int fd, String filterName, String filterArgs) throws IOException; private static native void setAcceptFilter(int fd, String filterName, String filterArgs) throws IOException;
private static native void setTcpNoPush(int fd, int tcpNoPush) throws IOException; private static native void setTcpNoPush(int fd, int tcpNoPush) throws IOException;
private static native void setSndLowAt(int fd, int lowAt) throws IOException; private static native void setSndLowAt(int fd, int lowAt) throws IOException;
} }

View File

@ -15,14 +15,18 @@
*/ */
package io.netty.channel.kqueue; package io.netty.channel.kqueue;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.unix.IovArray;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@UnstableApi @UnstableApi
@ -64,6 +68,35 @@ public final class KQueueSocketChannel extends AbstractKQueueStreamChannel imple
return (ServerSocketChannel) super.parent(); return (ServerSocketChannel) super.parent();
} }
@Override
protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (config.isTcpFastOpenConnect()) {
ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
outbound.addFlush();
Object curr;
if ((curr = outbound.current()) instanceof ByteBuf) {
ByteBuf initialData = (ByteBuf) curr;
// Don't bother with TCP FastOpen if we don't have any initial data to send anyway.
if (initialData.isReadable()) {
IovArray iov = new IovArray(config.getAllocator().directBuffer());
try {
iov.add(initialData, initialData.readerIndex(), initialData.readableBytes());
int bytesSent = socket.connectx(
(InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true);
writeFilter(true);
outbound.removeBytes(Math.abs(bytesSent));
// The `connectx` method returns a negative number if connection is in-progress.
// So we should return `true` to indicate that connection was established, if it's positive.
return bytesSent > 0;
} finally {
iov.release();
}
}
}
}
return super.doConnect0(remoteAddress, localAddress);
}
@Override @Override
protected AbstractKQueueUnsafe newUnsafe() { protected AbstractKQueueUnsafe newUnsafe() {
return new KQueueSocketChannelUnsafe(); return new KQueueSocketChannelUnsafe();

View File

@ -40,6 +40,7 @@ import static io.netty.channel.kqueue.KQueueChannelOption.TCP_NOPUSH;
@UnstableApi @UnstableApi
public final class KQueueSocketChannelConfig extends KQueueDuplexChannelConfig implements SocketChannelConfig { public final class KQueueSocketChannelConfig extends KQueueDuplexChannelConfig implements SocketChannelConfig {
private volatile boolean tcpFastopen;
KQueueSocketChannelConfig(KQueueSocketChannel channel) { KQueueSocketChannelConfig(KQueueSocketChannel channel) {
super(channel); super(channel);
@ -87,6 +88,9 @@ public final class KQueueSocketChannelConfig extends KQueueDuplexChannelConfig i
if (option == TCP_NOPUSH) { if (option == TCP_NOPUSH) {
return (T) Boolean.valueOf(isTcpNoPush()); return (T) Boolean.valueOf(isTcpNoPush());
} }
if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
return (T) Boolean.valueOf(isTcpFastOpenConnect());
}
return super.getOption(option); return super.getOption(option);
} }
@ -112,6 +116,8 @@ public final class KQueueSocketChannelConfig extends KQueueDuplexChannelConfig i
setSndLowAt((Integer) value); setSndLowAt((Integer) value);
} else if (option == TCP_NOPUSH) { } else if (option == TCP_NOPUSH) {
setTcpNoPush((Boolean) value); setTcpNoPush((Boolean) value);
} else if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
setTcpFastOpenConnect((Boolean) value);
} else { } else {
return super.setOption(option, value); return super.setOption(option, value);
} }
@ -285,6 +291,21 @@ public final class KQueueSocketChannelConfig extends KQueueDuplexChannelConfig i
} }
} }
/**
* Enables client TCP fast open, if available.
*/
public KQueueSocketChannelConfig setTcpFastOpenConnect(boolean fastOpenConnect) {
tcpFastopen = fastOpenConnect;
return this;
}
/**
* Returns {@code true} if TCP fast open is enabled, {@code false} otherwise.
*/
public boolean isTcpFastOpenConnect() {
return tcpFastopen;
}
@Override @Override
public KQueueSocketChannelConfig setRcvAllocTransportProvidesGuess(boolean transportProvidesGuess) { public KQueueSocketChannelConfig setRcvAllocTransportProvidesGuess(boolean transportProvidesGuess) {
super.setRcvAllocTransportProvidesGuess(transportProvidesGuess); super.setRcvAllocTransportProvidesGuess(transportProvidesGuess);

View File

@ -46,4 +46,8 @@ final class KQueueStaticallyReferencedJniMethods {
static native short evfiltWrite(); static native short evfiltWrite();
static native short evfiltUser(); static native short evfiltUser();
static native short evfiltSock(); static native short evfiltSock();
// Flags for connectx(2)
static native int connectResumeOnReadWrite();
static native int connectDataIdempotent();
} }

View File

@ -29,6 +29,8 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.connectDataIdempotent;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.connectResumeOnReadWrite;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evAdd; import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evAdd;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evClear; import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evClear;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evDelete; import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evDelete;
@ -104,6 +106,11 @@ final class Native {
static final short EVFILT_USER = evfiltUser(); static final short EVFILT_USER = evfiltUser();
static final short EVFILT_SOCK = evfiltSock(); static final short EVFILT_SOCK = evfiltSock();
// Flags for connectx(2)
private static final int CONNECT_RESUME_ON_READ_WRITE = connectResumeOnReadWrite();
private static final int CONNECT_DATA_IDEMPOTENT = connectDataIdempotent();
static final int CONNECT_TCP_FASTOPEN = CONNECT_RESUME_ON_READ_WRITE | CONNECT_DATA_IDEMPOTENT;
static FileDescriptor newKQueue() { static FileDescriptor newKQueue() {
return new FileDescriptor(kqueueCreate()); return new FileDescriptor(kqueueCreate());
} }

View File

@ -24,6 +24,6 @@ import java.util.List;
public class KQueueSocketChannelNotYetConnectedTest extends SocketChannelNotYetConnectedTest { public class KQueueSocketChannelNotYetConnectedTest extends SocketChannelNotYetConnectedTest {
@Override @Override
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() { protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
return KQueueSocketTestPermutation.INSTANCE.clientSocket(); return KQueueSocketTestPermutation.INSTANCE.clientSocketWithFastOpen();
} }
} }

View File

@ -19,6 +19,7 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
@ -60,7 +61,6 @@ class KQueueSocketTestPermutation extends SocketTestPermutation {
return list; return list;
} }
@SuppressWarnings("unchecked")
@Override @Override
public List<BootstrapFactory<ServerBootstrap>> serverSocket() { public List<BootstrapFactory<ServerBootstrap>> serverSocket() {
List<BootstrapFactory<ServerBootstrap>> toReturn = new ArrayList<>(); List<BootstrapFactory<ServerBootstrap>> toReturn = new ArrayList<>();
@ -73,20 +73,32 @@ class KQueueSocketTestPermutation extends SocketTestPermutation {
return toReturn; return toReturn;
} }
@SuppressWarnings("unchecked")
@Override @Override
public List<BootstrapFactory<Bootstrap>> clientSocket() { public List<BootstrapFactory<Bootstrap>> clientSocket() {
return Arrays.asList( List<BootstrapFactory<Bootstrap>> toReturn = new ArrayList<BootstrapFactory<Bootstrap>>();
() -> new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueSocketChannel.class),
() -> new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class) toReturn.add(() -> new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueSocketChannel.class));
); toReturn.add(() -> new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class));
return toReturn;
}
@Override
public List<BootstrapFactory<Bootstrap>> clientSocketWithFastOpen() {
List<BootstrapFactory<Bootstrap>> factories = clientSocket();
int insertIndex = factories.size() - 1; // Keep NIO fixture last.
factories.add(insertIndex,
() -> new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueSocketChannel.class)
.option(ChannelOption.TCP_FASTOPEN_CONNECT, true));
return factories;
} }
@Override @Override
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram( public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram(
final InternetProtocolFamily family) { final InternetProtocolFamily family) {
// Make the list of Bootstrap factories. // Make the list of Bootstrap factories.
@SuppressWarnings("unchecked")
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList( List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
() -> new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() { () -> new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
@Override @Override

View File

@ -25,6 +25,6 @@ public class KqueueWriteBeforeRegisteredTest extends WriteBeforeRegisteredTest {
@Override @Override
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() { protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
return KQueueSocketTestPermutation.INSTANCE.clientSocket(); return KQueueSocketTestPermutation.INSTANCE.clientSocketWithFastOpen();
} }
} }

View File

@ -52,7 +52,7 @@ public class Socket extends FileDescriptor {
public Socket(int fd) { public Socket(int fd) {
super(fd); super(fd);
this.ipv6 = isIPv6(fd); ipv6 = isIPv6(fd);
} }
/** /**
@ -72,7 +72,7 @@ public class Socket extends FileDescriptor {
// shutdown anything. This is because if the underlying FD is reused and we still have an object which // shutdown anything. This is because if the underlying FD is reused and we still have an object which
// represents the previous incarnation of the FD we need to be sure we don't inadvertently shutdown the // represents the previous incarnation of the FD we need to be sure we don't inadvertently shutdown the
// "new" FD without explicitly having a change. // "new" FD without explicitly having a change.
final int oldState = this.state; final int oldState = state;
if (isClosed(oldState)) { if (isClosed(oldState)) {
throw new ClosedChannelException(); throw new ClosedChannelException();
} }