Add support for RFC7413 on linux for server sockets

Motivation:

TCP Fast Open allows data to be carried in the SYN and SYN-ACK packets and consumed by the receiving end during the initial connection handshake, and saves up to one full round-trip time (RTT) compared to the standard TCP, which requires a three-way handshake (3WHS) to complete before data can be exchanged. This commit enables support for TFO on server sockets.

Modifications:

Added new Integer Option TCP_FASTOPEN in EpollChannelOption.
Added getters/setters in EpollServerChannelConfig for TCP_FASTOPEN.
Added way to check if TCP_FASTOPEN is supported on server in Native.
Added setting on socket opt TCP_FASTOPEN if value is set on channel options in doBind in EpollServerSocketChannel.
Enhanced EpollSocketTestPermutation to contain a permutation for server socket containing fast open.

Result:

Users of native-epoll can set TCP_FASTOPEN on server sockets and thus leverage fast connect features of RFC7413 if client is capable of it.

Conflicts:
	transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java

Conflicts:
	transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java
This commit is contained in:
Peeyush Aggarwal 2015-09-04 15:03:10 -07:00 committed by Norman Maurer
parent 6c022ef86f
commit c449ceac3a
7 changed files with 157 additions and 17 deletions

View File

@ -26,6 +26,7 @@
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <linux/socket.h> // SOL_TCP definition
#include <unistd.h> #include <unistd.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <fcntl.h> #include <fcntl.h>
@ -40,6 +41,20 @@
#define TCP_NOTSENT_LOWAT 25 #define TCP_NOTSENT_LOWAT 25
#endif #endif
#ifndef _KERNEL_FASTOPEN
#define _KERNEL_FASTOPEN
// conditional define for TCP_FASTOPEN mostly on ubuntu
#ifndef TCP_FASTOPEN
#define TCP_FASTOPEN 23
#endif
// conditional define for SOL_TCP mostly on ubuntu
#ifndef SOL_TCP
#define SOL_TCP 6
#endif
#endif
/** /**
* On older Linux kernels, epoll can't handle timeout * On older Linux kernels, epoll can't handle timeout
* values bigger than (LONG_MAX - 999ULL)/HZ. * values bigger than (LONG_MAX - 999ULL)/HZ.
@ -136,6 +151,20 @@ void throwOutOfMemoryError(JNIEnv* env) {
(*env)->ThrowNew(env, exceptionClass, ""); (*env)->ThrowNew(env, exceptionClass, "");
} }
static int getSysctlValue(const char * property, int* returnValue) {
int rc = -1;
FILE *fd=fopen(property, "r");
if (fd != NULL) {
char buf[32] = {0x0};
if (fgets(buf, 32, fd) != NULL) {
*returnValue = atoi(buf);
rc = 0;
}
fclose(fd);
}
return rc;
}
/** Notice: every usage of exceptionMessage needs to release the allocated memory for the sequence of char */ /** Notice: every usage of exceptionMessage needs to release the allocated memory for the sequence of char */
char* exceptionMessage(char* msg, int error) { char* exceptionMessage(char* msg, int error) {
if (error < 0) { if (error < 0) {
@ -1234,6 +1263,10 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv* env
setOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval)); setOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
} }
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpFastopen(JNIEnv* env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, SOL_TCP, TCP_FASTOPEN, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd, jint optval) { JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &optval, sizeof(optval)); setOption(env, fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &optval, sizeof(optval));
} }
@ -1479,6 +1512,15 @@ JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_isSupportingSendmm
return JNI_FALSE; return JNI_FALSE;
} }
JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_isSupportingTcpFastopen(JNIEnv* env, jclass clazz) {
int fastopen = 0;
getSysctlValue("/proc/sys/net/ipv4/tcp_fastopen", &fastopen);
if (fastopen > 0) {
return JNI_TRUE;
}
return JNI_FALSE;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_errnoEBADF(JNIEnv* env, jclass clazz) { JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_errnoEBADF(JNIEnv* env, jclass clazz) {
return EBADF; return EBADF;
} }

View File

@ -80,6 +80,7 @@ void Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv* env, jclass
void Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv* env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv* env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv* env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTcpFastopen(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv* env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv* env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv* env, jclass clazz, jint fd, jint optval);
@ -110,6 +111,7 @@ jint Java_io_netty_channel_epoll_Native_iovMax(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_uioMaxIov(JNIEnv* env, jclass clazz); jint Java_io_netty_channel_epoll_Native_uioMaxIov(JNIEnv* env, jclass clazz);
jlong Java_io_netty_channel_epoll_Native_ssizeMax(JNIEnv* env, jclass clazz); jlong Java_io_netty_channel_epoll_Native_ssizeMax(JNIEnv* env, jclass clazz);
jboolean Java_io_netty_channel_epoll_Native_isSupportingSendmmsg(JNIEnv* env, jclass clazz); jboolean Java_io_netty_channel_epoll_Native_isSupportingSendmmsg(JNIEnv* env, jclass clazz);
jboolean Java_io_netty_channel_epoll_Native_isSupportingTcpFastopen(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_errnoEBADF(JNIEnv* env, jclass clazz); jint Java_io_netty_channel_epoll_Native_errnoEBADF(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_errnoEPIPE(JNIEnv* env, jclass clazz); jint Java_io_netty_channel_epoll_Native_errnoEPIPE(JNIEnv* env, jclass clazz);

View File

@ -33,7 +33,8 @@ public final class EpollChannelOption {
public static final ChannelOption<Integer> TCP_KEEPCNT = ChannelOption.valueOf(T, "TCP_KEEPCNT"); public static final ChannelOption<Integer> TCP_KEEPCNT = ChannelOption.valueOf(T, "TCP_KEEPCNT");
public static final ChannelOption<Integer> TCP_USER_TIMEOUT = ChannelOption.valueOf(T, "TCP_USER_TIMEOUT"); public static final ChannelOption<Integer> TCP_USER_TIMEOUT = ChannelOption.valueOf(T, "TCP_USER_TIMEOUT");
public static final ChannelOption<Map<InetAddress, byte[]>> TCP_MD5SIG = ChannelOption.valueOf(T, "TCP_MD5SIG"); public static final ChannelOption<Map<InetAddress, byte[]>> TCP_MD5SIG = ChannelOption.valueOf(T, "TCP_MD5SIG");
public static final ChannelOption<Boolean> IP_FREEBIND = ChannelOption.valueOf("IP_FREEBIND"); public static final ChannelOption<Boolean> IP_FREEBIND = ChannelOption.valueOf(T, "IP_FREEBIND");
public static final ChannelOption<Integer> TCP_FASTOPEN = ChannelOption.valueOf(T, "TCP_FASTOPEN");
public static final ChannelOption<DomainSocketReadMode> DOMAIN_SOCKET_READ_MODE = public static final ChannelOption<DomainSocketReadMode> DOMAIN_SOCKET_READ_MODE =
ChannelOption.valueOf(T, "DOMAIN_SOCKET_READ_MODE"); ChannelOption.valueOf(T, "DOMAIN_SOCKET_READ_MODE");

View File

@ -16,6 +16,7 @@
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator; import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
@ -32,6 +33,7 @@ import static io.netty.channel.epoll.EpollChannelOption.TCP_MD5SIG;;
public class EpollServerChannelConfig extends EpollChannelConfig { public class EpollServerChannelConfig extends EpollChannelConfig {
protected final AbstractEpollChannel channel; protected final AbstractEpollChannel channel;
private volatile int backlog = NetUtil.SOMAXCONN; private volatile int backlog = NetUtil.SOMAXCONN;
private volatile int pendingFastOpenRequestsThreshold;
EpollServerChannelConfig(AbstractEpollChannel channel) { EpollServerChannelConfig(AbstractEpollChannel channel) {
super(channel); super(channel);
@ -40,7 +42,7 @@ public class EpollServerChannelConfig extends EpollChannelConfig {
@Override @Override
public Map<ChannelOption<?>, Object> getOptions() { public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, EpollChannelOption.TCP_FASTOPEN);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -55,6 +57,9 @@ public class EpollServerChannelConfig extends EpollChannelConfig {
if (option == SO_BACKLOG) { if (option == SO_BACKLOG) {
return (T) Integer.valueOf(getBacklog()); return (T) Integer.valueOf(getBacklog());
} }
if (option == EpollChannelOption.TCP_FASTOPEN) {
return (T) Integer.valueOf(getTcpFastopen());
}
return super.getOption(option); return super.getOption(option);
} }
@ -72,6 +77,8 @@ public class EpollServerChannelConfig extends EpollChannelConfig {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Map<InetAddress, byte[]> m = (Map<InetAddress, byte[]>) value; final Map<InetAddress, byte[]> m = (Map<InetAddress, byte[]>) value;
((EpollServerSocketChannel) channel).setTcpMd5Sig(m); ((EpollServerSocketChannel) channel).setTcpMd5Sig(m);
} else if (option == EpollChannelOption.TCP_FASTOPEN) {
setTcpFastopen((Integer) value);
} else { } else {
return super.setOption(option, value); return super.setOption(option, value);
} }
@ -109,6 +116,32 @@ public class EpollServerChannelConfig extends EpollChannelConfig {
return this; return this;
} }
/**
* Returns threshold value of number of pending for fast open connect.
*
* @see <a href="https://tools.ietf.org/html/rfc7413#appendix-A.2">RFC 7413 Passive Open</a>
*/
public int getTcpFastopen() {
return pendingFastOpenRequestsThreshold;
}
/**
* Enables tcpFastOpen on the server channel. If the underlying os doesnt support TCP_FASTOPEN setting this has no
* effect. This has to be set before doing listen on the socket otherwise this takes no effect.
*
* @param pendingFastOpenRequestsThreshold number of requests to be pending for fastopen at a given point in time
* for security. @see <a href="https://tools.ietf.org/html/rfc7413#appendix-A.2">RFC 7413 Passive Open</a>
*
* @see <a href="https://tools.ietf.org/html/rfc7413">RFC 7413 TCP FastOpen</a>
*/
public EpollServerChannelConfig setTcpFastopen(int pendingFastOpenRequestsThreshold) {
if (this.pendingFastOpenRequestsThreshold < 0) {
throw new IllegalArgumentException("pendingFastOpenRequestsThreshold: " + pendingFastOpenRequestsThreshold);
}
this.pendingFastOpenRequestsThreshold = pendingFastOpenRequestsThreshold;
return this;
}
@Override @Override
public EpollServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { public EpollServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis); super.setConnectTimeoutMillis(connectTimeoutMillis);

View File

@ -66,6 +66,9 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i
int fd = fd().intValue(); int fd = fd().intValue();
Native.bind(fd, addr); Native.bind(fd, addr);
local = Native.localAddress(fd); local = Native.localAddress(fd);
if (Native.IS_SUPPORTING_TCP_FASTOPEN && config.getTcpFastopen() > 0) {
Native.setTcpFastopen(fd, config.getTcpFastopen());
}
Native.listen(fd, config.getBacklog()); Native.listen(fd, config.getBacklog());
active = true; active = true;
} }

View File

@ -61,6 +61,7 @@ public final class Native {
public static final int IOV_MAX = iovMax(); public static final int IOV_MAX = iovMax();
public static final int UIO_MAX_IOV = uioMaxIov(); public static final int UIO_MAX_IOV = uioMaxIov();
public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg(); public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg();
public static final boolean IS_SUPPORTING_TCP_FASTOPEN = isSupportingTcpFastopen();
public static final long SSIZE_MAX = ssizeMax(); public static final long SSIZE_MAX = ssizeMax();
public static final int TCP_MD5SIG_MAXKEYLEN = tcpMd5SigMaxKeyLen(); public static final int TCP_MD5SIG_MAXKEYLEN = tcpMd5SigMaxKeyLen();
@ -398,6 +399,7 @@ public final class Native {
int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len); int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len);
private static native boolean isSupportingSendmmsg(); private static native boolean isSupportingSendmmsg();
private static native boolean isSupportingTcpFastopen();
// socket operations // socket operations
public static int socketStreamFd() { public static int socketStreamFd() {
@ -648,6 +650,7 @@ public final class Native {
public static native void setSendBufferSize(int fd, int sendBufferSize); public static native void setSendBufferSize(int fd, int sendBufferSize);
public static native void setTcpNoDelay(int fd, int tcpNoDelay); public static native void setTcpNoDelay(int fd, int tcpNoDelay);
public static native void setTcpCork(int fd, int tcpCork); public static native void setTcpCork(int fd, int tcpCork);
public static native void setTcpFastopen(int fd, int tcpFastopenBacklog);
public static native void setTcpNotSentLowAt(int fd, int tcpNotSentLowAt); public static native void setTcpNotSentLowAt(int fd, int tcpNotSentLowAt);
public static native void setSoLinger(int fd, int soLinger); public static native void setSoLinger(int fd, int soLinger);
public static native void setTrafficClass(int fd, int tcpNoDelay); public static native void setTrafficClass(int fd, int tcpNoDelay);

View File

@ -29,9 +29,16 @@ import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory; import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory;
import io.netty.testsuite.transport.socket.SocketTestPermutation; import io.netty.testsuite.transport.socket.SocketTestPermutation;
import io.netty.util.concurrent.DefaultExecutorServiceFactory; import io.netty.util.concurrent.DefaultExecutorServiceFactory;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -45,6 +52,8 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
static final EventLoopGroup EPOLL_WORKER_GROUP = static final EventLoopGroup EPOLL_WORKER_GROUP =
new EpollEventLoopGroup(WORKERS, new DefaultExecutorServiceFactory("testsuite-epoll-worker")); new EpollEventLoopGroup(WORKERS, new DefaultExecutorServiceFactory("testsuite-epoll-worker"));
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollSocketTestPermutation.class);
@Override @Override
public List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> socket() { public List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> socket() {
@ -59,22 +68,34 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public List<BootstrapFactory<ServerBootstrap>> serverSocket() { public List<BootstrapFactory<ServerBootstrap>> serverSocket() {
return Arrays.asList( List<BootstrapFactory<ServerBootstrap>> toReturn = new ArrayList<BootstrapFactory<ServerBootstrap>>();
new BootstrapFactory<ServerBootstrap>() { toReturn.add(new BootstrapFactory<ServerBootstrap>() {
@Override @Override
public ServerBootstrap newInstance() { public ServerBootstrap newInstance() {
return new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP) return new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP)
.channel(EpollServerSocketChannel.class); .channel(EpollServerSocketChannel.class);
} }
}, });
new BootstrapFactory<ServerBootstrap>() { if (isServerFastOpen()) {
@Override toReturn.add(new BootstrapFactory<ServerBootstrap>() {
public ServerBootstrap newInstance() { @Override
return new ServerBootstrap().group(nioBossGroup, nioWorkerGroup) public ServerBootstrap newInstance() {
.channel(NioServerSocketChannel.class); ServerBootstrap serverBootstrap = new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP)
} .channel(EpollServerSocketChannel.class);
serverBootstrap.option(EpollChannelOption.TCP_FASTOPEN, 5);
return serverBootstrap;
} }
); });
}
toReturn.add(new BootstrapFactory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().group(nioBossGroup, nioWorkerGroup)
.channel(NioServerSocketChannel.class);
}
});
return toReturn;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -157,6 +178,41 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
); );
} }
public boolean isServerFastOpen() {
return AccessController.doPrivileged(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
int fastopen = 0;
File file = new File("/proc/sys/net/ipv4/tcp_fastopen");
if (file.exists()) {
BufferedReader in = null;
try {
in = new BufferedReader(new FileReader(file));
fastopen = Integer.parseInt(in.readLine());
if (logger.isDebugEnabled()) {
logger.debug("{}: {}", file, fastopen);
}
} catch (Exception e) {
logger.debug("Failed to get TCP_FASTOPEN from: {}", file, e);
} finally {
if (in != null) {
try {
in.close();
} catch (Exception e) {
// Ignored.
}
}
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("{}: {} (non-existent)", file, fastopen);
}
}
return fastopen;
}
}) == 3;
}
public static DomainSocketAddress newSocketAddress() { public static DomainSocketAddress newSocketAddress() {
try { try {
File file = File.createTempFile("netty", "dsocket"); File file = File.createTempFile("netty", "dsocket");