TCP_NOTSENT_LOWAT socket option support

Motiviation:
Linux provides the TCP_NOTSENT_LOWAT socket option. This can be used to control how much unsent data is queued in the tcp kernel buffers. This can be important when application level protocols (SPDY, HTTP/2) have their own priority mechanism and don't want data queued in the kernel.

Modifications:
- The epoll module will have an additional socket option TCP_NOTSENT_LOWAT
- There will be JNI methods to control the underlying linux socket option mechanism

Result:
Linux EPOLL module exposes the TCP_NOTSENT_LOWAT socket option.
This commit is contained in:
Scott Mitchell 2015-05-27 18:37:56 -07:00
parent 3adc6566d4
commit a17e1c89b6
8 changed files with 154 additions and 20 deletions

View File

@ -22,7 +22,7 @@
#include <sys/eventfd.h>
#include <sys/sendfile.h>
#include <sys/un.h>
#include <netinet/tcp.h>
#include <linux/tcp.h> // TCP_NOTSENT_LOWAT is a linux specific define
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
@ -1204,7 +1204,11 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv* e
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv* env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, SOL_TCP, TCP_CORK, &optval, sizeof(optval));
setOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv* env, jclass clazz, jint fd, jint optval) {
@ -1228,15 +1232,15 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setBroadcast(JNIEnv* e
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
setOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpKeepIntvl(JNIEnv* env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
setOption(env, fd, IPPROTO_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
}
JNIEXPORT void Java_io_netty_channel_epoll_Native_setTcpKeepCnt(JNIEnv* env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(optval));
setOption(env, fd, IPPROTO_TCP, TCP_KEEPCNT, &optval, sizeof(optval));
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv* env, jclass clazz, jint fd) {
@ -1281,7 +1285,15 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIE
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, SOL_TCP, TCP_CORK, &optval, sizeof(optval)) == -1) {
if (getOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
@ -1325,7 +1337,7 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isBroadcast(JNIEnv* en
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)) == -1) {
if (getOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
@ -1333,7 +1345,7 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpKeepIdle(JNIEnv*
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpKeepIntvl(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval)) == -1) {
if (getOption(env, fd, IPPROTO_TCP, TCP_KEEPINTVL, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
@ -1341,7 +1353,7 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpKeepIntvl(JNIEnv
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(optval)) == -1) {
if (getOption(env, fd, IPPROTO_TCP, TCP_KEEPCNT, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
@ -1349,7 +1361,7 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv*
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_tcpInfo0(JNIEnv* env, jclass clazz, jint fd, jintArray array) {
struct tcp_info tcp_info;
if (getOption(env, fd, SOL_TCP, TCP_INFO, &tcp_info, sizeof(tcp_info)) == -1) {
if (getOption(env, fd, IPPROTO_TCP, TCP_INFO, &tcp_info, sizeof(tcp_info)) == -1) {
return;
}
unsigned int cArray[32];

View File

@ -80,6 +80,7 @@ void Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv* env, jclass
void Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv* env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setBroadcast(JNIEnv* env, jclass clazz, jint fd, jint optval);
@ -93,6 +94,7 @@ jint Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv* env, jclass clazz,
jint Java_io_netty_channel_epoll_Native_getReceiveBufferSize(JNIEnv* env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv* env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv* env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv* env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv* env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_isBroadcast(JNIEnv* env, jclass clazz, jint fd);
@ -123,4 +125,4 @@ jint Java_io_netty_channel_epoll_Native_sizeofEpollEvent(JNIEnv* env, jclass cla
jint Java_io_netty_channel_epoll_Native_offsetofEpollData(JNIEnv* env, jclass clazz);
jlong Java_io_netty_channel_epoll_Native_pipe0(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_splice0(JNIEnv* env, jclass clazz, jint fd, jint offIn, jint fdOut, jint offOut, jint len);
jint Java_io_netty_channel_epoll_Native_splice0(JNIEnv* env, jclass clazz, jint fd, jint offIn, jint fdOut, jint offOut, jint len);

View File

@ -21,12 +21,12 @@ import io.netty.channel.unix.DomainSocketReadMode;
public final class EpollChannelOption<T> extends ChannelOption<T> {
public static final ChannelOption<Boolean> TCP_CORK = valueOf("TCP_CORK");
public static final ChannelOption<Boolean> SO_REUSEPORT = valueOf("SO_REUSEPORT");
public static final ChannelOption<Long> TCP_NOTSENT_LOWAT = valueOf("TCP_NOTSENT_LOWAT");
public static final ChannelOption<Integer> TCP_KEEPIDLE = valueOf("TCP_KEEPIDLE");
public static final ChannelOption<Integer> TCP_KEEPINTVL = valueOf("TCP_KEEPINTVL");
public static final ChannelOption<Integer> TCP_KEEPCNT = valueOf("TCP_KEEPCNT");
public static final ChannelOption<Boolean> SO_REUSEPORT = valueOf("SO_REUSEPORT");
public static final ChannelOption<DomainSocketReadMode> DOMAIN_SOCKET_READ_MODE =
valueOf("DOMAIN_SOCKET_READ_MODE");

View File

@ -16,9 +16,7 @@
package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.unix.DomainSocketChannelConfig;
@ -28,8 +26,7 @@ import java.util.Map;
public final class EpollDomainSocketChannelConfig extends EpollChannelConfig
implements DomainSocketChannelConfig {
private volatile DomainSocketReadMode mode =
DomainSocketReadMode.BYTES;
private volatile DomainSocketReadMode mode = DomainSocketReadMode.BYTES;
EpollDomainSocketChannelConfig(AbstractEpollChannel channel) {
super(channel);

View File

@ -27,7 +27,7 @@ import java.util.Map;
import static io.netty.channel.ChannelOption.*;
public final class EpollSocketChannelConfig extends EpollChannelConfig implements SocketChannelConfig {
private static final long MAX_UINT32_T = 0xFFFFFFFFL;
private final EpollSocketChannel channel;
private volatile boolean allowHalfClosure;
@ -48,8 +48,8 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
return getOptions(
super.getOptions(),
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS,
ALLOW_HALF_CLOSURE, EpollChannelOption.TCP_CORK, EpollChannelOption.TCP_KEEPCNT,
EpollChannelOption.TCP_KEEPIDLE, EpollChannelOption.TCP_KEEPINTVL);
ALLOW_HALF_CLOSURE, EpollChannelOption.TCP_CORK, EpollChannelOption.TCP_NOTSENT_LOWAT,
EpollChannelOption.TCP_KEEPCNT, EpollChannelOption.TCP_KEEPIDLE, EpollChannelOption.TCP_KEEPINTVL);
}
@SuppressWarnings("unchecked")
@ -82,6 +82,9 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
if (option == EpollChannelOption.TCP_CORK) {
return (T) Boolean.valueOf(isTcpCork());
}
if (option == EpollChannelOption.TCP_NOTSENT_LOWAT) {
return (T) Long.valueOf(getTcpNotSentLowAt());
}
if (option == EpollChannelOption.TCP_KEEPIDLE) {
return (T) Integer.valueOf(getTcpKeepIdle());
}
@ -116,6 +119,8 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
setAllowHalfClosure((Boolean) value);
} else if (option == EpollChannelOption.TCP_CORK) {
setTcpCork((Boolean) value);
} else if (option == EpollChannelOption.TCP_NOTSENT_LOWAT) {
setTcpNotSentLowAt((Long) value);
} else if (option == EpollChannelOption.TCP_KEEPIDLE) {
setTcpKeepIdle((Integer) value);
} else if (option == EpollChannelOption.TCP_KEEPCNT) {
@ -171,6 +176,14 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
return Native.isTcpCork(channel.fd().intValue()) == 1;
}
/**
* Get the {@code TCP_NOTSENT_LOWAT} option on the socket. See {@code man 7 tcp} for more details.
* @return value is a uint32_t
*/
public long getTcpNotSentLowAt() {
return Native.getTcpNotSentLowAt(channel.fd().intValue()) & MAX_UINT32_T;
}
/**
* Get the {@code TCP_KEEPIDLE} option on the socket. See {@code man 7 tcp} for more details.
*/
@ -242,6 +255,18 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
return this;
}
/**
* Set the {@code TCP_NOTSENT_LOWAT} option on the socket. See {@code man 7 tcp} for more details.
* @param tcpNotSentLowAt is a uint32_t
*/
public EpollSocketChannelConfig setTcpNotSentLowAt(long tcpNotSentLowAt) {
if (tcpNotSentLowAt < 0 || tcpNotSentLowAt > MAX_UINT32_T) {
throw new IllegalArgumentException("tcpNotSentLowAt must be a uint32_t");
}
Native.setTcpNotSentLowAt(channel.fd().intValue(), (int) tcpNotSentLowAt);
return this;
}
@Override
public EpollSocketChannelConfig setTrafficClass(int trafficClass) {
Native.setTrafficClass(channel.fd().intValue(), trafficClass);

View File

@ -626,6 +626,7 @@ public final class Native {
public static native int isReusePort(int fd);
public static native int isTcpNoDelay(int fd);
public static native int isTcpCork(int fd);
public static native int getTcpNotSentLowAt(int fd);
public static native int getSoLinger(int fd);
public static native int getTrafficClass(int fd);
public static native int isBroadcast(int fd);
@ -641,6 +642,7 @@ public final class Native {
public static native void setSendBufferSize(int fd, int sendBufferSize);
public static native void setTcpNoDelay(int fd, int tcpNoDelay);
public static native void setTcpCork(int fd, int tcpCork);
public static native void setTcpNotSentLowAt(int fd, int tcpNotSentLowAt);
public static native void setSoLinger(int fd, int soLinger);
public static native void setTrafficClass(int fd, int tcpNoDelay);
public static native void setBroadcast(int fd, int broadcast);

View File

@ -0,0 +1,93 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import static org.junit.Assert.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import java.net.InetSocketAddress;
import java.util.Random;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class EpollSocketChannelConfigTest {
private static EventLoopGroup group;
private static EpollSocketChannel ch;
private static Random rand;
@BeforeClass
public static void before() {
rand = new Random();
group = new EpollEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
ch = (EpollSocketChannel) bootstrap.group(group)
.channel(EpollSocketChannel.class)
.handler(new ChannelInboundHandlerAdapter())
.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
}
@AfterClass
public static void after() {
group.shutdownGracefully();
}
private long randLong(long min, long max) {
return min + nextLong(max - min + 1);
}
private long nextLong(long n) {
long bits, val;
do {
bits = (rand.nextLong() << 1) >>> 1;
val = bits % n;
} while (bits - val + (n - 1) < 0L);
return val;
}
@Test
public void testRandomTcpNotSentLowAt() {
final long value = randLong(0, 0xFFFFFFFFL);
ch.config().setTcpNotSentLowAt(value);
assertEquals(value, ch.config().getTcpNotSentLowAt());
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidHighTcpNotSentLowAt() {
final long value = 0xFFFFFFFFL + 1;
ch.config().setTcpNotSentLowAt(value);
fail();
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidLowTcpNotSentLowAt() {
final long value = -1;
ch.config().setTcpNotSentLowAt(value);
fail();
}
@Test
public void testTcpCork() {
ch.config().setTcpCork(false);
assertFalse(ch.config().isTcpCork());
ch.config().setTcpCork(true);
assertTrue(ch.config().isTcpCork());
}
}

View File

@ -56,6 +56,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
return list;
}
@SuppressWarnings("unchecked")
@Override
public List<BootstrapFactory<ServerBootstrap>> serverSocket() {
return Arrays.asList(
@ -76,6 +77,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
);
}
@SuppressWarnings("unchecked")
@Override
public List<BootstrapFactory<Bootstrap>> clientSocket() {
return Arrays.asList(
@ -97,6 +99,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
@Override
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram() {
// Make the list of Bootstrap factories.
@SuppressWarnings("unchecked")
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override