Add support for client-side TCP FastOpen to KQueue MacOS (#11560)

Motivation:
The MacOS-specific `connectx(2)` system call make it possible to establish client-side connections with TCP FastOpen.

Modification:
Add support for TCP FastOpen to the KQueue transport, and add the `connectx(2)` system call to `BsdSocket`.

Result:
It's now possible to use TCP FastOpen when initiating connections on MacOS.
This commit is contained in:
Chris Vest 2021-08-12 13:38:46 +02:00 committed by GitHub
parent 699549dcbc
commit f750e2eb6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 315 additions and 43 deletions

View File

@ -28,11 +28,9 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.StringUtil;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.opentest4j.TestAbortedException;
import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
@ -189,8 +187,9 @@ public class SocketConnectTest extends AbstractSocketTest {
}
protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) {
throw new TestAbortedException(
"Support for testing TCP_FASTOPEN not enabled for " + StringUtil.simpleClassName(this));
// TFO is an almost-pure optimisation and should not change any observable behaviour in our tests.
sb.option(ChannelOption.TCP_FASTOPEN, 5);
cb.option(ChannelOption.TCP_FASTOPEN_CONNECT, true);
}
private static void assertLocalAddress(InetSocketAddress address) {

View File

@ -29,10 +29,4 @@ public class EpollSocketConnectTest extends SocketConnectTest {
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen();
}
@Override
protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) {
sb.option(ChannelOption.TCP_FASTOPEN, 5);
cb.option(ChannelOption.TCP_FASTOPEN_CONNECT, true);
}
}

View File

@ -68,7 +68,6 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
return list;
}
@SuppressWarnings("unchecked")
@Override
public List<BootstrapFactory<ServerBootstrap>> serverSocket() {
List<BootstrapFactory<ServerBootstrap>> toReturn = new ArrayList<BootstrapFactory<ServerBootstrap>>();

View File

@ -13,6 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
#include <assert.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
@ -83,6 +84,60 @@ static jlong netty_kqueue_bsdsocket_sendFile(JNIEnv* env, jclass clazz, jint soc
return res < 0 ? -err : 0;
}
static jint netty_kqueue_bsdsocket_connectx(JNIEnv* env, jclass clazz,
jint socketFd,
jint socketInterface,
jboolean sourceIPv6, jbyteArray sourceAddress, jint sourceScopeId, jint sourcePort,
jboolean destinationIPv6, jbyteArray destinationAddress, jint destinationScopeId, jint destinationPort,
jint flags,
jlong iovAddress, jint iovCount, jint iovDataLength) {
#ifdef __APPLE__ // connectx(2) is only defined on Darwin.
sa_endpoints_t endpoints;
endpoints.sae_srcif = (unsigned int) socketInterface;
endpoints.sae_srcaddr = NULL;
endpoints.sae_srcaddrlen = 0;
endpoints.sae_dstaddr = NULL;
endpoints.sae_dstaddrlen = 0;
struct sockaddr_storage srcaddr;
socklen_t srcaddrlen;
struct sockaddr_storage dstaddr;
socklen_t dstaddrlen;
if (NULL != sourceAddress) {
if (-1 == netty_unix_socket_initSockaddr(env,
sourceIPv6, sourceAddress, sourceScopeId, sourcePort, &srcaddr, &srcaddrlen)) {
netty_unix_errors_throwIOException(env,
"Source address specified, but could not be converted to sockaddr.");
return -EINVAL;
}
endpoints.sae_srcaddr = (const struct sockaddr*) &srcaddr;
endpoints.sae_srcaddrlen = srcaddrlen;
}
assert(destinationAddress != NULL); // Java side will ensure destination is never null.
if (-1 == netty_unix_socket_initSockaddr(env,
destinationIPv6, destinationAddress, destinationScopeId, destinationPort, &dstaddr, &dstaddrlen)) {
netty_unix_errors_throwIOException(env, "Destination address could not be converted to sockaddr.");
return -EINVAL;
}
endpoints.sae_dstaddr = (const struct sockaddr*) &dstaddr;
endpoints.sae_dstaddrlen = dstaddrlen;
int socket = (int) socketFd;
const struct iovec* iov = (const struct iovec*) iovAddress;
unsigned int iovcnt = (unsigned int) iovCount;
size_t len = (size_t) iovDataLength;
int result = connectx(socket, &endpoints, SAE_ASSOCID_ANY, flags, iov, iovcnt, &len, NULL);
if (result == -1) {
return -errno;
}
return (jint) len;
#else
return -ENOSYS;
#endif
}
static void netty_kqueue_bsdsocket_setAcceptFilter(JNIEnv* env, jclass clazz, jint fd, jstring afName, jstring afArg) {
#ifdef SO_ACCEPTFILTER
struct accept_filter_arg af;
@ -196,7 +251,8 @@ static const JNINativeMethod fixed_method_table[] = {
{ "setSndLowAt", "(II)V", (void *) netty_kqueue_bsdsocket_setSndLowAt },
{ "getAcceptFilter", "(I)[Ljava/lang/String;", (void *) netty_kqueue_bsdsocket_getAcceptFilter },
{ "getTcpNoPush", "(I)I", (void *) netty_kqueue_bsdsocket_getTcpNoPush },
{ "getSndLowAt", "(I)I", (void *) netty_kqueue_bsdsocket_getSndLowAt }
{ "getSndLowAt", "(I)I", (void *) netty_kqueue_bsdsocket_getSndLowAt },
{ "connectx", "(IIZ[BIIZ[BIIIJII)I", (void *) netty_kqueue_bsdsocket_connectx }
};
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);

View File

@ -60,6 +60,12 @@
#ifndef NOTE_DISCONNECTED
#define NOTE_DISCONNECTED 0x00001000
#endif /* NOTE_DISCONNECTED */
#ifndef CONNECT_RESUME_ON_READ_WRITE
#define CONNECT_RESUME_ON_READ_WRITE 0x1
#endif /* CONNECT_RESUME_ON_READ_WRITE */
#ifndef CONNECT_DATA_IDEMPOTENT
#define CONNECT_DATA_IDEMPOTENT 0x2
#endif /* CONNECT_DATA_IDEMPOTENT */
#else
#ifndef EVFILT_SOCK
#define EVFILT_SOCK 0 // Disabled
@ -73,6 +79,12 @@
#ifndef NOTE_DISCONNECTED
#define NOTE_DISCONNECTED 0
#endif /* NOTE_DISCONNECTED */
#ifndef CONNECT_RESUME_ON_READ_WRITE
#define CONNECT_RESUME_ON_READ_WRITE 0
#endif /* CONNECT_RESUME_ON_READ_WRITE */
#ifndef CONNECT_DATA_IDEMPOTENT
#define CONNECT_DATA_IDEMPOTENT 0
#endif /* CONNECT_DATA_IDEMPOTENT */
#endif /* __APPLE__ */
static clockid_t waitClockId = 0; // initialized by netty_unix_util_initialize_wait_clock
@ -247,6 +259,14 @@ static jshort netty_kqueue_native_noteDisconnected(JNIEnv* env, jclass clazz) {
return NOTE_DISCONNECTED;
}
static jint netty_kqueue_bsdsocket_connectResumeOnReadWrite(JNIEnv *env) {
return CONNECT_RESUME_ON_READ_WRITE;
}
static jint netty_kqueue_bsdsocket_connectDataIdempotent(JNIEnv *env) {
return CONNECT_DATA_IDEMPOTENT;
}
// JNI Method Registration Table Begin
static const JNINativeMethod statically_referenced_fixed_method_table[] = {
{ "evfiltRead", "()S", (void *) netty_kqueue_native_evfiltRead },
@ -262,7 +282,9 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = {
{ "evError", "()S", (void *) netty_kqueue_native_evError },
{ "noteReadClosed", "()S", (void *) netty_kqueue_native_noteReadClosed },
{ "noteConnReset", "()S", (void *) netty_kqueue_native_noteConnReset },
{ "noteDisconnected", "()S", (void *) netty_kqueue_native_noteDisconnected }
{ "noteDisconnected", "()S", (void *) netty_kqueue_native_noteDisconnected },
{ "connectResumeOnReadWrite", "()I", (void *) netty_kqueue_bsdsocket_connectResumeOnReadWrite },
{ "connectDataIdempotent", "()I", (void *) netty_kqueue_bsdsocket_connectDataIdempotent }
};
static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]);
static const JNINativeMethod fixed_method_table[] = {

View File

@ -390,7 +390,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
final void readReadyFinally(ChannelConfig config) {
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
if (allocHandle.isReadEOF() || (readPending && maybeMoreDataToRead)) {
if (allocHandle.isReadEOF() || readPending && maybeMoreDataToRead) {
// trigger a read again as there may be something left to read and because of ET we
// will not get notified again until we read everything from the socket
//
@ -699,7 +699,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
socket.bind(localAddress);
}
boolean connected = doConnect0(remoteAddress);
boolean connected = doConnect0(remoteAddress, localAddress);
if (connected) {
remote = remoteSocketAddr == null?
remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
@ -711,10 +711,10 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
return connected;
}
private boolean doConnect0(SocketAddress remote) throws Exception {
protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
boolean success = false;
try {
boolean connected = socket.connect(remote);
boolean connected = socket.connect(remoteAddress);
if (!connected) {
writeFilter(true);
}

View File

@ -16,13 +16,21 @@
package io.netty.channel.kqueue;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.PeerCredentials;
import io.netty.channel.unix.Socket;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import static io.netty.channel.kqueue.AcceptFilter.PLATFORM_UNSUPPORTED;
import static io.netty.channel.kqueue.Native.CONNECT_TCP_FASTOPEN;
import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
import static io.netty.channel.unix.Errors.ioResult;
import static io.netty.channel.unix.NativeInetAddress.ipv4MappedIpv6Address;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/**
* A socket which provides access BSD native methods.
@ -34,6 +42,12 @@ final class BsdSocket extends Socket {
private static final int APPLE_SND_LOW_AT_MAX = 1 << 17;
private static final int FREEBSD_SND_LOW_AT_MAX = 1 << 15;
static final int BSD_SND_LOW_AT_MAX = Math.min(APPLE_SND_LOW_AT_MAX, FREEBSD_SND_LOW_AT_MAX);
/**
* The `endpoints` structure passed to `connectx(2)` has an optional "source interface" field,
* which is the index of the network interface to use.
* According to `if_nametoindex(3)`, the value 0 is used when no interface is specified.
*/
private static final int UNSPECIFIED_SOURCE_INTERFACE = 0;
BsdSocket(int fd) {
super(fd);
@ -51,7 +65,7 @@ final class BsdSocket extends Socket {
setSndLowAt(intValue(), lowAt);
}
boolean isTcpNoPush() throws IOException {
boolean isTcpNoPush() throws IOException {
return getTcpNoPush(intValue()) != 0;
}
@ -80,6 +94,96 @@ final class BsdSocket extends Socket {
return ioResult("sendfile", (int) res);
}
/**
* Establish a connection to the given destination address, and send the given data to it.
*
* <strong>Note:</strong> This method relies on the {@code connectx(2)} system call, which is MacOS specific.
*
* @param source the source address we are connecting from.
* @param destination the destination address we are connecting to.
* @param data the data to copy to the kernel-side socket buffer.
* @param tcpFastOpen if {@code true}, set the flags needed to enable TCP FastOpen connecting.
* @return The number of bytes copied to the kernel-side socket buffer, or the number of bytes sent to the
* destination. This number is <em>negative</em> if connecting is left in an in-progress state,
* or <em>positive</em> if the connection was immediately established.
* @throws IOException if an IO error occurs, if the {@code data} is too big to send in one go,
* or if the system call is not supported on your platform.
*/
int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray data, boolean tcpFastOpen)
throws IOException {
checkNotNull(destination, "Destination InetSocketAddress cannot be null.");
int flags = tcpFastOpen ? CONNECT_TCP_FASTOPEN : 0;
boolean sourceIPv6;
byte[] sourceAddress;
int sourceScopeId;
int sourcePort;
if (source == null) {
sourceIPv6 = false;
sourceAddress = null;
sourceScopeId = 0;
sourcePort = 0;
} else {
InetAddress sourceInetAddress = source.getAddress();
sourceIPv6 = sourceInetAddress instanceof Inet6Address;
if (sourceIPv6) {
sourceAddress = sourceInetAddress.getAddress();
sourceScopeId = ((Inet6Address) sourceInetAddress).getScopeId();
} else {
// convert to ipv4 mapped ipv6 address;
sourceScopeId = 0;
sourceAddress = ipv4MappedIpv6Address(sourceInetAddress.getAddress());
}
sourcePort = source.getPort();
}
InetAddress destinationInetAddress = destination.getAddress();
boolean destinationIPv6 = destinationInetAddress instanceof Inet6Address;
byte[] destinationAddress;
int destinationScopeId;
if (destinationIPv6) {
destinationAddress = destinationInetAddress.getAddress();
destinationScopeId = ((Inet6Address) destinationInetAddress).getScopeId();
} else {
// convert to ipv4 mapped ipv6 address;
destinationScopeId = 0;
destinationAddress = ipv4MappedIpv6Address(destinationInetAddress.getAddress());
}
int destinationPort = destination.getPort();
long iovAddress;
int iovCount;
int iovDataLength;
if (data == null || data.count() == 0) {
iovAddress = 0;
iovCount = 0;
iovDataLength = 0;
} else {
iovAddress = data.memoryAddress(0);
iovCount = data.count();
long size = data.size();
if (size > Integer.MAX_VALUE) {
throw new IOException("IovArray.size() too big: " + size + " bytes.");
}
iovDataLength = (int) size;
}
int result = connectx(intValue(),
UNSPECIFIED_SOURCE_INTERFACE, sourceIPv6, sourceAddress, sourceScopeId, sourcePort,
destinationIPv6, destinationAddress, destinationScopeId, destinationPort,
flags, iovAddress, iovCount, iovDataLength);
if (result == ERRNO_EINPROGRESS_NEGATIVE) {
// This is normal for non-blocking sockets.
// We'll know the connection has been established when the socket is selectable for writing.
// Tell the channel the data was written, so the outbound buffer can update its position.
return -iovDataLength;
}
if (result < 0) {
return ioResult("connectx", result);
}
return result;
}
public static BsdSocket newSocketStream() {
return new BsdSocket(newSocketStream0());
}
@ -99,12 +203,32 @@ final class BsdSocket extends Socket {
private static native long sendFile(int socketFd, DefaultFileRegion src, long baseOffset,
long offset, long length) throws IOException;
/**
* @return If successful, zero or positive number of bytes transfered, otherwise negative errno.
*/
private static native int connectx(
int socketFd,
// sa_endpoints_t *endpoints:
int sourceInterface,
boolean sourceIPv6, byte[] sourceAddress, int sourceScopeId, int sourcePort,
boolean destinationIPv6, byte[] destinationAddress, int destinationScopeId, int destinationPort,
// sae_associd_t associd is reserved
int flags,
long iovAddress, int iovCount, int iovDataLength
// sae_connid_t *connid is reserved
);
private static native String[] getAcceptFilter(int fd) throws IOException;
private static native int getTcpNoPush(int fd) throws IOException;
private static native int getSndLowAt(int fd) throws IOException;
private static native PeerCredentials getPeerCredentials(int fd) throws IOException;
private static native void setAcceptFilter(int fd, String filterName, String filterArgs) throws IOException;
private static native void setTcpNoPush(int fd, int tcpNoPush) throws IOException;
private static native void setSndLowAt(int fd, int lowAt) throws IOException;
}

View File

@ -15,13 +15,17 @@
*/
package io.netty.channel.kqueue;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.unix.IovArray;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.UnstableApi;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
@UnstableApi
@ -63,6 +67,35 @@ public final class KQueueSocketChannel extends AbstractKQueueStreamChannel imple
return (ServerSocketChannel) super.parent();
}
@Override
protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (config.isTcpFastOpenConnect()) {
ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
outbound.addFlush();
Object curr;
if ((curr = outbound.current()) instanceof ByteBuf) {
ByteBuf initialData = (ByteBuf) curr;
// Don't bother with TCP FastOpen if we don't have any initial data to send anyway.
if (initialData.isReadable()) {
IovArray iov = new IovArray(config.getAllocator().directBuffer());
try {
iov.add(initialData, initialData.readerIndex(), initialData.readableBytes());
int bytesSent = socket.connectx(
(InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true);
writeFilter(true);
outbound.removeBytes(Math.abs(bytesSent));
// The `connectx` method returns a negative number if connection is in-progress.
// So we should return `true` to indicate that connection was established, if it's positive.
return bytesSent > 0;
} finally {
iov.release();
}
}
}
}
return super.doConnect0(remoteAddress, localAddress);
}
@Override
protected AbstractKQueueUnsafe newUnsafe() {
return new KQueueSocketChannelUnsafe();

View File

@ -42,6 +42,7 @@ import static io.netty.channel.kqueue.KQueueChannelOption.TCP_NOPUSH;
@UnstableApi
public final class KQueueSocketChannelConfig extends KQueueChannelConfig implements SocketChannelConfig {
private volatile boolean allowHalfClosure;
private volatile boolean tcpFastopen;
KQueueSocketChannelConfig(KQueueSocketChannel channel) {
super(channel);
@ -92,6 +93,9 @@ public final class KQueueSocketChannelConfig extends KQueueChannelConfig impleme
if (option == TCP_NOPUSH) {
return (T) Boolean.valueOf(isTcpNoPush());
}
if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
return (T) Boolean.valueOf(isTcpFastOpenConnect());
}
return super.getOption(option);
}
@ -119,6 +123,8 @@ public final class KQueueSocketChannelConfig extends KQueueChannelConfig impleme
setSndLowAt((Integer) value);
} else if (option == TCP_NOPUSH) {
setTcpNoPush((Boolean) value);
} else if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
setTcpFastOpenConnect((Boolean) value);
} else {
return super.setOption(option, value);
}
@ -297,6 +303,21 @@ public final class KQueueSocketChannelConfig extends KQueueChannelConfig impleme
return allowHalfClosure;
}
/**
* Enables client TCP fast open, if available.
*/
public KQueueSocketChannelConfig setTcpFastOpenConnect(boolean fastOpenConnect) {
tcpFastopen = fastOpenConnect;
return this;
}
/**
* Returns {@code true} if TCP fast open is enabled, {@code false} otherwise.
*/
public boolean isTcpFastOpenConnect() {
return tcpFastopen;
}
@Override
public KQueueSocketChannelConfig setRcvAllocTransportProvidesGuess(boolean transportProvidesGuess) {
super.setRcvAllocTransportProvidesGuess(transportProvidesGuess);

View File

@ -46,4 +46,8 @@ final class KQueueStaticallyReferencedJniMethods {
static native short evfiltWrite();
static native short evfiltUser();
static native short evfiltSock();
// Flags for connectx(2)
static native int connectResumeOnReadWrite();
static native int connectDataIdempotent();
}

View File

@ -29,6 +29,8 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.nio.channels.FileChannel;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.connectDataIdempotent;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.connectResumeOnReadWrite;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evAdd;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evClear;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evDelete;
@ -104,6 +106,11 @@ final class Native {
static final short EVFILT_USER = evfiltUser();
static final short EVFILT_SOCK = evfiltSock();
// Flags for connectx(2)
private static final int CONNECT_RESUME_ON_READ_WRITE = connectResumeOnReadWrite();
private static final int CONNECT_DATA_IDEMPOTENT = connectDataIdempotent();
static final int CONNECT_TCP_FASTOPEN = CONNECT_RESUME_ON_READ_WRITE | CONNECT_DATA_IDEMPOTENT;
static FileDescriptor newKQueue() {
return new FileDescriptor(kqueueCreate());
}

View File

@ -24,6 +24,6 @@ import java.util.List;
public class KQueueSocketChannelNotYetConnectedTest extends SocketChannelNotYetConnectedTest {
@Override
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
return KQueueSocketTestPermutation.INSTANCE.clientSocket();
return KQueueSocketTestPermutation.INSTANCE.clientSocketWithFastOpen();
}
}

View File

@ -19,6 +19,7 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
@ -30,8 +31,6 @@ import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory;
import io.netty.testsuite.transport.socket.SocketTestPermutation;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@ -47,8 +46,6 @@ class KQueueSocketTestPermutation extends SocketTestPermutation {
static final EventLoopGroup KQUEUE_WORKER_GROUP =
new KQueueEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-KQueue-worker", true));
private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueSocketTestPermutation.class);
@Override
public List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> socket() {
@ -60,7 +57,6 @@ class KQueueSocketTestPermutation extends SocketTestPermutation {
return list;
}
@SuppressWarnings("unchecked")
@Override
public List<BootstrapFactory<ServerBootstrap>> serverSocket() {
List<BootstrapFactory<ServerBootstrap>> toReturn = new ArrayList<BootstrapFactory<ServerBootstrap>>();
@ -83,30 +79,47 @@ class KQueueSocketTestPermutation extends SocketTestPermutation {
return toReturn;
}
@SuppressWarnings("unchecked")
@Override
public List<BootstrapFactory<Bootstrap>> clientSocket() {
return Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueSocketChannel.class);
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class);
}
}
);
List<BootstrapFactory<Bootstrap>> toReturn = new ArrayList<BootstrapFactory<Bootstrap>>();
toReturn.add(new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueSocketChannel.class);
}
});
toReturn.add(new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class);
}
});
return toReturn;
}
@Override
public List<BootstrapFactory<Bootstrap>> clientSocketWithFastOpen() {
List<BootstrapFactory<Bootstrap>> factories = clientSocket();
int insertIndex = factories.size() - 1; // Keep NIO fixture last.
factories.add(insertIndex, new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueSocketChannel.class)
.option(ChannelOption.TCP_FASTOPEN_CONNECT, true);
}
});
return factories;
}
@Override
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram(
final InternetProtocolFamily family) {
// Make the list of Bootstrap factories.
@SuppressWarnings("unchecked")
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override

View File

@ -25,6 +25,6 @@ public class KqueueWriteBeforeRegisteredTest extends WriteBeforeRegisteredTest {
@Override
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
return KQueueSocketTestPermutation.INSTANCE.clientSocket();
return KQueueSocketTestPermutation.INSTANCE.clientSocketWithFastOpen();
}
}

View File

@ -52,7 +52,7 @@ public class Socket extends FileDescriptor {
public Socket(int fd) {
super(fd);
this.ipv6 = isIPv6(fd);
ipv6 = isIPv6(fd);
}
/**
@ -72,7 +72,7 @@ public class Socket extends FileDescriptor {
// shutdown anything. This is because if the underlying FD is reused and we still have an object which
// represents the previous incarnation of the FD we need to be sure we don't inadvertently shutdown the
// "new" FD without explicitly having a change.
final int oldState = this.state;
final int oldState = state;
if (isClosed(oldState)) {
throw new ClosedChannelException();
}