Add support for RFC7413 on linux for server sockets

Motivation:

TCP Fast Open allows data to be carried in the SYN and SYN-ACK packets and consumed by the receiving end during the initial connection handshake, and saves up to one full round-trip time (RTT) compared to the standard TCP, which requires a three-way handshake (3WHS) to complete before data can be exchanged. This commit enables support for TFO on server sockets.

Modifications:

Added new Integer Option TCP_FASTOPEN in EpollChannelOption.
Added getters/setters in EpollServerChannelConfig for TCP_FASTOPEN.
Added way to check if TCP_FASTOPEN is supported on server in Native.
Added setting on socket opt TCP_FASTOPEN if value is set on channel options in doBind in EpollServerSocketChannel.
Enhanced EpollSocketTestPermutation to contain a permutation for server socket containing fast open.

Result:

Users of native-epoll can set TCP_FASTOPEN on server sockets and thus leverage fast connect features of RFC7413 if client is capable of it.
This commit is contained in:
Peeyush Aggarwal 2015-09-04 15:03:10 -07:00 committed by Norman Maurer
parent e146873740
commit 24860e70cc
7 changed files with 156 additions and 16 deletions

View File

@ -26,6 +26,7 @@
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <linux/socket.h> // SOL_TCP definition
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
@ -40,6 +41,20 @@
#define TCP_NOTSENT_LOWAT 25
#endif
#ifndef _KERNEL_FASTOPEN
#define _KERNEL_FASTOPEN
// conditional define for TCP_FASTOPEN mostly on ubuntu
#ifndef TCP_FASTOPEN
#define TCP_FASTOPEN 23
#endif
// conditional define for SOL_TCP mostly on ubuntu
#ifndef SOL_TCP
#define SOL_TCP 6
#endif
#endif
/**
* On older Linux kernels, epoll can't handle timeout
* values bigger than (LONG_MAX - 999ULL)/HZ.
@ -136,6 +151,20 @@ void throwOutOfMemoryError(JNIEnv* env) {
(*env)->ThrowNew(env, exceptionClass, "");
}
static int getSysctlValue(const char * property, int* returnValue) {
int rc = -1;
FILE *fd=fopen(property, "r");
if (fd != NULL) {
char buf[32] = {0x0};
if (fgets(buf, 32, fd) != NULL) {
*returnValue = atoi(buf);
rc = 0;
}
fclose(fd);
}
return rc;
}
/** Notice: every usage of exceptionMessage needs to release the allocated memory for the sequence of char */
char* exceptionMessage(char* msg, int error) {
if (error < 0) {
@ -1234,6 +1263,10 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv* env
setOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpFastopen(JNIEnv* env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, SOL_TCP, TCP_FASTOPEN, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &optval, sizeof(optval));
}
@ -1479,6 +1512,15 @@ JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_isSupportingSendmm
return JNI_FALSE;
}
JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_isSupportingTcpFastopen(JNIEnv* env, jclass clazz) {
int fastopen = 0;
getSysctlValue("/proc/sys/net/ipv4/tcp_fastopen", &fastopen);
if (fastopen > 0) {
return JNI_TRUE;
}
return JNI_FALSE;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_errnoEBADF(JNIEnv* env, jclass clazz) {
return EBADF;
}

View File

@ -80,6 +80,7 @@ void Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv* env, jclass
void Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTcpFastopen(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv* env, jclass clazz, jint fd, jint optval);
@ -110,6 +111,7 @@ jint Java_io_netty_channel_epoll_Native_iovMax(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_uioMaxIov(JNIEnv* env, jclass clazz);
jlong Java_io_netty_channel_epoll_Native_ssizeMax(JNIEnv* env, jclass clazz);
jboolean Java_io_netty_channel_epoll_Native_isSupportingSendmmsg(JNIEnv* env, jclass clazz);
jboolean Java_io_netty_channel_epoll_Native_isSupportingTcpFastopen(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_errnoEBADF(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_errnoEPIPE(JNIEnv* env, jclass clazz);

View File

@ -31,6 +31,7 @@ public final class EpollChannelOption<T> extends ChannelOption<T> {
public static final ChannelOption<Integer> TCP_KEEPCNT = valueOf("TCP_KEEPCNT");
public static final ChannelOption<Integer> TCP_USER_TIMEOUT = valueOf("TCP_USER_TIMEOUT");
public static final ChannelOption<Boolean> IP_FREEBIND = valueOf("IP_FREEBIND");
public static final ChannelOption<Integer> TCP_FASTOPEN = valueOf("TCP_FASTOPEN");
public static final ChannelOption<DomainSocketReadMode> DOMAIN_SOCKET_READ_MODE =
valueOf("DOMAIN_SOCKET_READ_MODE");

View File

@ -16,6 +16,7 @@
package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
@ -32,6 +33,7 @@ import static io.netty.channel.epoll.EpollChannelOption.TCP_MD5SIG;;
public class EpollServerChannelConfig extends EpollChannelConfig {
protected final AbstractEpollChannel channel;
private volatile int backlog = NetUtil.SOMAXCONN;
private volatile int pendingFastOpenRequestsThreshold;
EpollServerChannelConfig(AbstractEpollChannel channel) {
super(channel);
@ -40,7 +42,7 @@ public class EpollServerChannelConfig extends EpollChannelConfig {
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, EpollChannelOption.TCP_FASTOPEN);
}
@SuppressWarnings("unchecked")
@ -55,6 +57,9 @@ public class EpollServerChannelConfig extends EpollChannelConfig {
if (option == SO_BACKLOG) {
return (T) Integer.valueOf(getBacklog());
}
if (option == EpollChannelOption.TCP_FASTOPEN) {
return (T) Integer.valueOf(getTcpFastopen());
}
return super.getOption(option);
}
@ -72,6 +77,8 @@ public class EpollServerChannelConfig extends EpollChannelConfig {
@SuppressWarnings("unchecked")
final Map<InetAddress, byte[]> m = (Map<InetAddress, byte[]>) value;
((EpollServerSocketChannel) channel).setTcpMd5Sig(m);
} else if (option == EpollChannelOption.TCP_FASTOPEN) {
setTcpFastopen((Integer) value);
} else {
return super.setOption(option, value);
}
@ -109,6 +116,32 @@ public class EpollServerChannelConfig extends EpollChannelConfig {
return this;
}
/**
* Returns threshold value of number of pending for fast open connect.
*
* @see <a href="https://tools.ietf.org/html/rfc7413#appendix-A.2">RFC 7413 Passive Open</a>
*/
public int getTcpFastopen() {
return pendingFastOpenRequestsThreshold;
}
/**
* Enables tcpFastOpen on the server channel. If the underlying os doesnt support TCP_FASTOPEN setting this has no
* effect. This has to be set before doing listen on the socket otherwise this takes no effect.
*
* @param pendingFastOpenRequestsThreshold number of requests to be pending for fastopen at a given point in time
* for security. @see <a href="https://tools.ietf.org/html/rfc7413#appendix-A.2">RFC 7413 Passive Open</a>
*
* @see <a href="https://tools.ietf.org/html/rfc7413">RFC 7413 TCP FastOpen</a>
*/
public EpollServerChannelConfig setTcpFastopen(int pendingFastOpenRequestsThreshold) {
if (this.pendingFastOpenRequestsThreshold < 0) {
throw new IllegalArgumentException("pendingFastOpenRequestsThreshold: " + pendingFastOpenRequestsThreshold);
}
this.pendingFastOpenRequestsThreshold = pendingFastOpenRequestsThreshold;
return this;
}
@Override
public EpollServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);

View File

@ -66,6 +66,9 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i
int fd = fd().intValue();
Native.bind(fd, addr);
local = Native.localAddress(fd);
if (Native.IS_SUPPORTING_TCP_FASTOPEN && config.getTcpFastopen() > 0) {
Native.setTcpFastopen(fd, config.getTcpFastopen());
}
Native.listen(fd, config.getBacklog());
active = true;
}

View File

@ -61,6 +61,7 @@ public final class Native {
public static final int IOV_MAX = iovMax();
public static final int UIO_MAX_IOV = uioMaxIov();
public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg();
public static final boolean IS_SUPPORTING_TCP_FASTOPEN = isSupportingTcpFastopen();
public static final long SSIZE_MAX = ssizeMax();
public static final int TCP_MD5SIG_MAXKEYLEN = tcpMd5SigMaxKeyLen();
@ -398,6 +399,7 @@ public final class Native {
int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len);
private static native boolean isSupportingSendmmsg();
private static native boolean isSupportingTcpFastopen();
// socket operations
public static int socketStreamFd() {
@ -648,6 +650,7 @@ public final class Native {
public static native void setSendBufferSize(int fd, int sendBufferSize);
public static native void setTcpNoDelay(int fd, int tcpNoDelay);
public static native void setTcpCork(int fd, int tcpCork);
public static native void setTcpFastopen(int fd, int tcpFastopenBacklog);
public static native void setTcpNotSentLowAt(int fd, int tcpNotSentLowAt);
public static native void setSoLinger(int fd, int soLinger);
public static native void setTrafficClass(int fd, int tcpNoDelay);

View File

@ -29,9 +29,16 @@ import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory;
import io.netty.testsuite.transport.socket.SocketTestPermutation;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -45,6 +52,8 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
static final EventLoopGroup EPOLL_WORKER_GROUP =
new EpollEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-epoll-worker", true));
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollSocketTestPermutation.class);
@Override
public List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> socket() {
@ -59,22 +68,34 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
@SuppressWarnings("unchecked")
@Override
public List<BootstrapFactory<ServerBootstrap>> serverSocket() {
return Arrays.asList(
new BootstrapFactory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP)
.channel(EpollServerSocketChannel.class);
}
},
new BootstrapFactory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().group(nioBossGroup, nioWorkerGroup)
.channel(NioServerSocketChannel.class);
}
List<BootstrapFactory<ServerBootstrap>> toReturn = new ArrayList<BootstrapFactory<ServerBootstrap>>();
toReturn.add(new BootstrapFactory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP)
.channel(EpollServerSocketChannel.class);
}
});
if (isServerFastOpen()) {
toReturn.add(new BootstrapFactory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
ServerBootstrap serverBootstrap = new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP)
.channel(EpollServerSocketChannel.class);
serverBootstrap.option(EpollChannelOption.TCP_FASTOPEN, 5);
return serverBootstrap;
}
);
});
}
toReturn.add(new BootstrapFactory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().group(nioBossGroup, nioWorkerGroup)
.channel(NioServerSocketChannel.class);
}
});
return toReturn;
}
@SuppressWarnings("unchecked")
@ -157,6 +178,41 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
);
}
public boolean isServerFastOpen() {
return AccessController.doPrivileged(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
int fastopen = 0;
File file = new File("/proc/sys/net/ipv4/tcp_fastopen");
if (file.exists()) {
BufferedReader in = null;
try {
in = new BufferedReader(new FileReader(file));
fastopen = Integer.parseInt(in.readLine());
if (logger.isDebugEnabled()) {
logger.debug("{}: {}", file, fastopen);
}
} catch (Exception e) {
logger.debug("Failed to get TCP_FASTOPEN from: {}", file, e);
} finally {
if (in != null) {
try {
in.close();
} catch (Exception e) {
// Ignored.
}
}
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("{}: {} (non-existent)", file, fastopen);
}
}
return fastopen;
}
}) == 3;
}
public static DomainSocketAddress newSocketAddress() {
try {
File file = File.createTempFile("netty", "dsocket");