diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java index b1c4d48b07..8a55e93a74 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java @@ -30,11 +30,9 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; -import io.netty.util.internal.StringUtil; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; -import org.opentest4j.TestAbortedException; import java.io.ByteArrayOutputStream; import java.net.InetSocketAddress; @@ -177,8 +175,9 @@ public class SocketConnectTest extends AbstractSocketTest { } protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) { - throw new TestAbortedException( - "Support for testing TCP_FASTOPEN not enabled for " + StringUtil.simpleClassName(this)); + // TFO is an almost-pure optimisation and should not change any observable behaviour in our tests. + sb.option(ChannelOption.TCP_FASTOPEN, 5); + cb.option(ChannelOption.TCP_FASTOPEN_CONNECT, true); } private static void assertLocalAddress(InetSocketAddress address) { diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java index 459de4ef77..31f3a2644e 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java @@ -29,10 +29,4 @@ public class EpollSocketConnectTest extends SocketConnectTest { protected List> newFactories() { return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } - - @Override - protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) { - sb.option(ChannelOption.TCP_FASTOPEN, 5); - cb.option(ChannelOption.TCP_FASTOPEN_CONNECT, true); - } } diff --git a/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c b/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c index 5f021d0260..81690c4600 100644 --- a/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c +++ b/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c @@ -13,6 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ +#include #include #include #include @@ -83,6 +84,60 @@ static jlong netty_kqueue_bsdsocket_sendFile(JNIEnv* env, jclass clazz, jint soc return res < 0 ? -err : 0; } +static jint netty_kqueue_bsdsocket_connectx(JNIEnv* env, jclass clazz, + jint socketFd, + jint socketInterface, + jboolean sourceIPv6, jbyteArray sourceAddress, jint sourceScopeId, jint sourcePort, + jboolean destinationIPv6, jbyteArray destinationAddress, jint destinationScopeId, jint destinationPort, + jint flags, + jlong iovAddress, jint iovCount, jint iovDataLength) { +#ifdef __APPLE__ // connectx(2) is only defined on Darwin. + sa_endpoints_t endpoints; + endpoints.sae_srcif = (unsigned int) socketInterface; + endpoints.sae_srcaddr = NULL; + endpoints.sae_srcaddrlen = 0; + endpoints.sae_dstaddr = NULL; + endpoints.sae_dstaddrlen = 0; + + struct sockaddr_storage srcaddr; + socklen_t srcaddrlen; + struct sockaddr_storage dstaddr; + socklen_t dstaddrlen; + + if (NULL != sourceAddress) { + if (-1 == netty_unix_socket_initSockaddr(env, + sourceIPv6, sourceAddress, sourceScopeId, sourcePort, &srcaddr, &srcaddrlen)) { + netty_unix_errors_throwIOException(env, + "Source address specified, but could not be converted to sockaddr."); + return -EINVAL; + } + endpoints.sae_srcaddr = (const struct sockaddr*) &srcaddr; + endpoints.sae_srcaddrlen = srcaddrlen; + } + + assert(destinationAddress != NULL); // Java side will ensure destination is never null. + if (-1 == netty_unix_socket_initSockaddr(env, + destinationIPv6, destinationAddress, destinationScopeId, destinationPort, &dstaddr, &dstaddrlen)) { + netty_unix_errors_throwIOException(env, "Destination address could not be converted to sockaddr."); + return -EINVAL; + } + endpoints.sae_dstaddr = (const struct sockaddr*) &dstaddr; + endpoints.sae_dstaddrlen = dstaddrlen; + + int socket = (int) socketFd; + const struct iovec* iov = (const struct iovec*) iovAddress; + unsigned int iovcnt = (unsigned int) iovCount; + size_t len = (size_t) iovDataLength; + int result = connectx(socket, &endpoints, SAE_ASSOCID_ANY, flags, iov, iovcnt, &len, NULL); + if (result == -1) { + return -errno; + } + return (jint) len; +#else + return -ENOSYS; +#endif +} + static void netty_kqueue_bsdsocket_setAcceptFilter(JNIEnv* env, jclass clazz, jint fd, jstring afName, jstring afArg) { #ifdef SO_ACCEPTFILTER struct accept_filter_arg af; @@ -196,7 +251,8 @@ static const JNINativeMethod fixed_method_table[] = { { "setSndLowAt", "(II)V", (void *) netty_kqueue_bsdsocket_setSndLowAt }, { "getAcceptFilter", "(I)[Ljava/lang/String;", (void *) netty_kqueue_bsdsocket_getAcceptFilter }, { "getTcpNoPush", "(I)I", (void *) netty_kqueue_bsdsocket_getTcpNoPush }, - { "getSndLowAt", "(I)I", (void *) netty_kqueue_bsdsocket_getSndLowAt } + { "getSndLowAt", "(I)I", (void *) netty_kqueue_bsdsocket_getSndLowAt }, + { "connectx", "(IIZ[BIIZ[BIIIJII)I", (void *) netty_kqueue_bsdsocket_connectx } }; static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); diff --git a/transport-native-kqueue/src/main/c/netty_kqueue_native.c b/transport-native-kqueue/src/main/c/netty_kqueue_native.c index 3ac27a1e51..ceda160a2e 100644 --- a/transport-native-kqueue/src/main/c/netty_kqueue_native.c +++ b/transport-native-kqueue/src/main/c/netty_kqueue_native.c @@ -60,6 +60,12 @@ #ifndef NOTE_DISCONNECTED #define NOTE_DISCONNECTED 0x00001000 #endif /* NOTE_DISCONNECTED */ +#ifndef CONNECT_RESUME_ON_READ_WRITE +#define CONNECT_RESUME_ON_READ_WRITE 0x1 +#endif /* CONNECT_RESUME_ON_READ_WRITE */ +#ifndef CONNECT_DATA_IDEMPOTENT +#define CONNECT_DATA_IDEMPOTENT 0x2 +#endif /* CONNECT_DATA_IDEMPOTENT */ #else #ifndef EVFILT_SOCK #define EVFILT_SOCK 0 // Disabled @@ -73,6 +79,12 @@ #ifndef NOTE_DISCONNECTED #define NOTE_DISCONNECTED 0 #endif /* NOTE_DISCONNECTED */ +#ifndef CONNECT_RESUME_ON_READ_WRITE +#define CONNECT_RESUME_ON_READ_WRITE 0 +#endif /* CONNECT_RESUME_ON_READ_WRITE */ +#ifndef CONNECT_DATA_IDEMPOTENT +#define CONNECT_DATA_IDEMPOTENT 0 +#endif /* CONNECT_DATA_IDEMPOTENT */ #endif /* __APPLE__ */ static clockid_t waitClockId = 0; // initialized by netty_unix_util_initialize_wait_clock @@ -247,6 +259,14 @@ static jshort netty_kqueue_native_noteDisconnected(JNIEnv* env, jclass clazz) { return NOTE_DISCONNECTED; } +static jint netty_kqueue_bsdsocket_connectResumeOnReadWrite(JNIEnv *env) { + return CONNECT_RESUME_ON_READ_WRITE; +} + +static jint netty_kqueue_bsdsocket_connectDataIdempotent(JNIEnv *env) { + return CONNECT_DATA_IDEMPOTENT; +} + // JNI Method Registration Table Begin static const JNINativeMethod statically_referenced_fixed_method_table[] = { { "evfiltRead", "()S", (void *) netty_kqueue_native_evfiltRead }, @@ -262,7 +282,9 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = { { "evError", "()S", (void *) netty_kqueue_native_evError }, { "noteReadClosed", "()S", (void *) netty_kqueue_native_noteReadClosed }, { "noteConnReset", "()S", (void *) netty_kqueue_native_noteConnReset }, - { "noteDisconnected", "()S", (void *) netty_kqueue_native_noteDisconnected } + { "noteDisconnected", "()S", (void *) netty_kqueue_native_noteDisconnected }, + { "connectResumeOnReadWrite", "()I", (void *) netty_kqueue_bsdsocket_connectResumeOnReadWrite }, + { "connectDataIdempotent", "()I", (void *) netty_kqueue_bsdsocket_connectDataIdempotent } }; static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]); static const JNINativeMethod fixed_method_table[] = { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index 788b83a491..4c73e1769f 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -387,7 +387,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan final void readReadyFinally(ChannelConfig config) { maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); - if (allocHandle.isReadEOF() || (readPending && maybeMoreDataToRead)) { + if (allocHandle.isReadEOF() || readPending && maybeMoreDataToRead) { // trigger a read again as there may be something left to read and because of ET we // will not get notified again until we read everything from the socket // @@ -691,7 +691,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan socket.bind(localAddress); } - boolean connected = doConnect0(remoteAddress); + boolean connected = doConnect0(remoteAddress, localAddress); if (connected) { remote = remoteSocketAddr == null? remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress()); @@ -703,10 +703,10 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan return connected; } - private boolean doConnect0(SocketAddress remote) throws Exception { + protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { boolean success = false; try { - boolean connected = socket.connect(remote); + boolean connected = socket.connect(remoteAddress); if (!connected) { writeFilter(true); } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java index 4065882735..75a2e92ea1 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java @@ -16,13 +16,21 @@ package io.netty.channel.kqueue; import io.netty.channel.DefaultFileRegion; +import io.netty.channel.unix.IovArray; import io.netty.channel.unix.PeerCredentials; import io.netty.channel.unix.Socket; import java.io.IOException; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; import static io.netty.channel.kqueue.AcceptFilter.PLATFORM_UNSUPPORTED; +import static io.netty.channel.kqueue.Native.CONNECT_TCP_FASTOPEN; +import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE; import static io.netty.channel.unix.Errors.ioResult; +import static io.netty.channel.unix.NativeInetAddress.ipv4MappedIpv6Address; +import static io.netty.util.internal.ObjectUtil.checkNotNull; /** * A socket which provides access BSD native methods. @@ -34,6 +42,12 @@ final class BsdSocket extends Socket { private static final int APPLE_SND_LOW_AT_MAX = 1 << 17; private static final int FREEBSD_SND_LOW_AT_MAX = 1 << 15; static final int BSD_SND_LOW_AT_MAX = Math.min(APPLE_SND_LOW_AT_MAX, FREEBSD_SND_LOW_AT_MAX); + /** + * The `endpoints` structure passed to `connectx(2)` has an optional "source interface" field, + * which is the index of the network interface to use. + * According to `if_nametoindex(3)`, the value 0 is used when no interface is specified. + */ + private static final int UNSPECIFIED_SOURCE_INTERFACE = 0; BsdSocket(int fd) { super(fd); @@ -51,7 +65,7 @@ final class BsdSocket extends Socket { setSndLowAt(intValue(), lowAt); } - boolean isTcpNoPush() throws IOException { + boolean isTcpNoPush() throws IOException { return getTcpNoPush(intValue()) != 0; } @@ -80,6 +94,96 @@ final class BsdSocket extends Socket { return ioResult("sendfile", (int) res); } + /** + * Establish a connection to the given destination address, and send the given data to it. + * + * Note: This method relies on the {@code connectx(2)} system call, which is MacOS specific. + * + * @param source the source address we are connecting from. + * @param destination the destination address we are connecting to. + * @param data the data to copy to the kernel-side socket buffer. + * @param tcpFastOpen if {@code true}, set the flags needed to enable TCP FastOpen connecting. + * @return The number of bytes copied to the kernel-side socket buffer, or the number of bytes sent to the + * destination. This number is negative if connecting is left in an in-progress state, + * or positive if the connection was immediately established. + * @throws IOException if an IO error occurs, if the {@code data} is too big to send in one go, + * or if the system call is not supported on your platform. + */ + int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray data, boolean tcpFastOpen) + throws IOException { + checkNotNull(destination, "Destination InetSocketAddress cannot be null."); + int flags = tcpFastOpen ? CONNECT_TCP_FASTOPEN : 0; + + boolean sourceIPv6; + byte[] sourceAddress; + int sourceScopeId; + int sourcePort; + if (source == null) { + sourceIPv6 = false; + sourceAddress = null; + sourceScopeId = 0; + sourcePort = 0; + } else { + InetAddress sourceInetAddress = source.getAddress(); + sourceIPv6 = sourceInetAddress instanceof Inet6Address; + if (sourceIPv6) { + sourceAddress = sourceInetAddress.getAddress(); + sourceScopeId = ((Inet6Address) sourceInetAddress).getScopeId(); + } else { + // convert to ipv4 mapped ipv6 address; + sourceScopeId = 0; + sourceAddress = ipv4MappedIpv6Address(sourceInetAddress.getAddress()); + } + sourcePort = source.getPort(); + } + + InetAddress destinationInetAddress = destination.getAddress(); + boolean destinationIPv6 = destinationInetAddress instanceof Inet6Address; + byte[] destinationAddress; + int destinationScopeId; + if (destinationIPv6) { + destinationAddress = destinationInetAddress.getAddress(); + destinationScopeId = ((Inet6Address) destinationInetAddress).getScopeId(); + } else { + // convert to ipv4 mapped ipv6 address; + destinationScopeId = 0; + destinationAddress = ipv4MappedIpv6Address(destinationInetAddress.getAddress()); + } + int destinationPort = destination.getPort(); + + long iovAddress; + int iovCount; + int iovDataLength; + if (data == null || data.count() == 0) { + iovAddress = 0; + iovCount = 0; + iovDataLength = 0; + } else { + iovAddress = data.memoryAddress(0); + iovCount = data.count(); + long size = data.size(); + if (size > Integer.MAX_VALUE) { + throw new IOException("IovArray.size() too big: " + size + " bytes."); + } + iovDataLength = (int) size; + } + + int result = connectx(intValue(), + UNSPECIFIED_SOURCE_INTERFACE, sourceIPv6, sourceAddress, sourceScopeId, sourcePort, + destinationIPv6, destinationAddress, destinationScopeId, destinationPort, + flags, iovAddress, iovCount, iovDataLength); + if (result == ERRNO_EINPROGRESS_NEGATIVE) { + // This is normal for non-blocking sockets. + // We'll know the connection has been established when the socket is selectable for writing. + // Tell the channel the data was written, so the outbound buffer can update its position. + return -iovDataLength; + } + if (result < 0) { + return ioResult("connectx", result); + } + return result; + } + public static BsdSocket newSocketStream() { return new BsdSocket(newSocketStream0()); } @@ -99,12 +203,32 @@ final class BsdSocket extends Socket { private static native long sendFile(int socketFd, DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException; + /** + * @return If successful, zero or positive number of bytes transfered, otherwise negative errno. + */ + private static native int connectx( + int socketFd, + // sa_endpoints_t *endpoints: + int sourceInterface, + boolean sourceIPv6, byte[] sourceAddress, int sourceScopeId, int sourcePort, + boolean destinationIPv6, byte[] destinationAddress, int destinationScopeId, int destinationPort, + // sae_associd_t associd is reserved + int flags, + long iovAddress, int iovCount, int iovDataLength + // sae_connid_t *connid is reserved + ); + private static native String[] getAcceptFilter(int fd) throws IOException; + private static native int getTcpNoPush(int fd) throws IOException; + private static native int getSndLowAt(int fd) throws IOException; + private static native PeerCredentials getPeerCredentials(int fd) throws IOException; private static native void setAcceptFilter(int fd, String filterName, String filterArgs) throws IOException; + private static native void setTcpNoPush(int fd, int tcpNoPush) throws IOException; + private static native void setSndLowAt(int fd, int lowAt) throws IOException; } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java index 6410b30712..12a2767eec 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java @@ -15,14 +15,18 @@ */ package io.netty.channel.kqueue; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.EventLoop; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; +import io.netty.channel.unix.IovArray; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.internal.UnstableApi; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.concurrent.Executor; @UnstableApi @@ -64,6 +68,35 @@ public final class KQueueSocketChannel extends AbstractKQueueStreamChannel imple return (ServerSocketChannel) super.parent(); } + @Override + protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + if (config.isTcpFastOpenConnect()) { + ChannelOutboundBuffer outbound = unsafe().outboundBuffer(); + outbound.addFlush(); + Object curr; + if ((curr = outbound.current()) instanceof ByteBuf) { + ByteBuf initialData = (ByteBuf) curr; + // Don't bother with TCP FastOpen if we don't have any initial data to send anyway. + if (initialData.isReadable()) { + IovArray iov = new IovArray(config.getAllocator().directBuffer()); + try { + iov.add(initialData, initialData.readerIndex(), initialData.readableBytes()); + int bytesSent = socket.connectx( + (InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true); + writeFilter(true); + outbound.removeBytes(Math.abs(bytesSent)); + // The `connectx` method returns a negative number if connection is in-progress. + // So we should return `true` to indicate that connection was established, if it's positive. + return bytesSent > 0; + } finally { + iov.release(); + } + } + } + } + return super.doConnect0(remoteAddress, localAddress); + } + @Override protected AbstractKQueueUnsafe newUnsafe() { return new KQueueSocketChannelUnsafe(); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannelConfig.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannelConfig.java index 143663bc60..1ad15083c0 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannelConfig.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannelConfig.java @@ -40,6 +40,7 @@ import static io.netty.channel.kqueue.KQueueChannelOption.TCP_NOPUSH; @UnstableApi public final class KQueueSocketChannelConfig extends KQueueDuplexChannelConfig implements SocketChannelConfig { + private volatile boolean tcpFastopen; KQueueSocketChannelConfig(KQueueSocketChannel channel) { super(channel); @@ -87,6 +88,9 @@ public final class KQueueSocketChannelConfig extends KQueueDuplexChannelConfig i if (option == TCP_NOPUSH) { return (T) Boolean.valueOf(isTcpNoPush()); } + if (option == ChannelOption.TCP_FASTOPEN_CONNECT) { + return (T) Boolean.valueOf(isTcpFastOpenConnect()); + } return super.getOption(option); } @@ -112,6 +116,8 @@ public final class KQueueSocketChannelConfig extends KQueueDuplexChannelConfig i setSndLowAt((Integer) value); } else if (option == TCP_NOPUSH) { setTcpNoPush((Boolean) value); + } else if (option == ChannelOption.TCP_FASTOPEN_CONNECT) { + setTcpFastOpenConnect((Boolean) value); } else { return super.setOption(option, value); } @@ -285,6 +291,21 @@ public final class KQueueSocketChannelConfig extends KQueueDuplexChannelConfig i } } + /** + * Enables client TCP fast open, if available. + */ + public KQueueSocketChannelConfig setTcpFastOpenConnect(boolean fastOpenConnect) { + tcpFastopen = fastOpenConnect; + return this; + } + + /** + * Returns {@code true} if TCP fast open is enabled, {@code false} otherwise. + */ + public boolean isTcpFastOpenConnect() { + return tcpFastopen; + } + @Override public KQueueSocketChannelConfig setRcvAllocTransportProvidesGuess(boolean transportProvidesGuess) { super.setRcvAllocTransportProvidesGuess(transportProvidesGuess); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueStaticallyReferencedJniMethods.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueStaticallyReferencedJniMethods.java index 2ed7f53d43..e6926d65fa 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueStaticallyReferencedJniMethods.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueStaticallyReferencedJniMethods.java @@ -46,4 +46,8 @@ final class KQueueStaticallyReferencedJniMethods { static native short evfiltWrite(); static native short evfiltUser(); static native short evfiltSock(); + + // Flags for connectx(2) + static native int connectResumeOnReadWrite(); + static native int connectDataIdempotent(); } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java index ff3986f826..5f4c77fbc2 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java @@ -29,6 +29,8 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; import java.nio.channels.FileChannel; +import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.connectDataIdempotent; +import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.connectResumeOnReadWrite; import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evAdd; import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evClear; import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evDelete; @@ -104,6 +106,11 @@ final class Native { static final short EVFILT_USER = evfiltUser(); static final short EVFILT_SOCK = evfiltSock(); + // Flags for connectx(2) + private static final int CONNECT_RESUME_ON_READ_WRITE = connectResumeOnReadWrite(); + private static final int CONNECT_DATA_IDEMPOTENT = connectDataIdempotent(); + static final int CONNECT_TCP_FASTOPEN = CONNECT_RESUME_ON_READ_WRITE | CONNECT_DATA_IDEMPOTENT; + static FileDescriptor newKQueue() { return new FileDescriptor(kqueueCreate()); } diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketChannelNotYetConnectedTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketChannelNotYetConnectedTest.java index b486024b6c..a9d93eef99 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketChannelNotYetConnectedTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketChannelNotYetConnectedTest.java @@ -24,6 +24,6 @@ import java.util.List; public class KQueueSocketChannelNotYetConnectedTest extends SocketChannelNotYetConnectedTest { @Override protected List> newFactories() { - return KQueueSocketTestPermutation.INSTANCE.clientSocket(); + return KQueueSocketTestPermutation.INSTANCE.clientSocketWithFastOpen(); } } diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketTestPermutation.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketTestPermutation.java index 18b2e7b68b..98d13d251f 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketTestPermutation.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketTestPermutation.java @@ -19,6 +19,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup; @@ -60,7 +61,6 @@ class KQueueSocketTestPermutation extends SocketTestPermutation { return list; } - @SuppressWarnings("unchecked") @Override public List> serverSocket() { List> toReturn = new ArrayList<>(); @@ -73,20 +73,32 @@ class KQueueSocketTestPermutation extends SocketTestPermutation { return toReturn; } - @SuppressWarnings("unchecked") @Override public List> clientSocket() { - return Arrays.asList( - () -> new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueSocketChannel.class), - () -> new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class) - ); + List> toReturn = new ArrayList>(); + + toReturn.add(() -> new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueSocketChannel.class)); + toReturn.add(() -> new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class)); + + return toReturn; + } + + @Override + public List> clientSocketWithFastOpen() { + List> factories = clientSocket(); + + int insertIndex = factories.size() - 1; // Keep NIO fixture last. + factories.add(insertIndex, + () -> new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueSocketChannel.class) + .option(ChannelOption.TCP_FASTOPEN_CONNECT, true)); + + return factories; } @Override public List> datagram( final InternetProtocolFamily family) { // Make the list of Bootstrap factories. - @SuppressWarnings("unchecked") List> bfs = Arrays.asList( () -> new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory() { @Override diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KqueueWriteBeforeRegisteredTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KqueueWriteBeforeRegisteredTest.java index e623753ea8..1e7f14ced4 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KqueueWriteBeforeRegisteredTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KqueueWriteBeforeRegisteredTest.java @@ -25,6 +25,6 @@ public class KqueueWriteBeforeRegisteredTest extends WriteBeforeRegisteredTest { @Override protected List> newFactories() { - return KQueueSocketTestPermutation.INSTANCE.clientSocket(); + return KQueueSocketTestPermutation.INSTANCE.clientSocketWithFastOpen(); } } diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java index 0452845803..a2cf22e163 100644 --- a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java @@ -52,7 +52,7 @@ public class Socket extends FileDescriptor { public Socket(int fd) { super(fd); - this.ipv6 = isIPv6(fd); + ipv6 = isIPv6(fd); } /** @@ -72,7 +72,7 @@ public class Socket extends FileDescriptor { // shutdown anything. This is because if the underlying FD is reused and we still have an object which // represents the previous incarnation of the FD we need to be sure we don't inadvertently shutdown the // "new" FD without explicitly having a change. - final int oldState = this.state; + final int oldState = state; if (isClosed(oldState)) { throw new ClosedChannelException(); }