TCP_NOTSENT_LOWAT socket option support
Motiviation: Linux provides the TCP_NOTSENT_LOWAT socket option. This can be used to control how much unsent data is queued in the tcp kernel buffers. This can be important when application level protocols (SPDY, HTTP/2) have their own priority mechanism and don't want data queued in the kernel. Modifications: - The epoll module will have an additional socket option TCP_NOTSENT_LOWAT - There will be JNI methods to control the underlying linux socket option mechanism Result: Linux EPOLL module exposes the TCP_NOTSENT_LOWAT socket option.
This commit is contained in:
parent
4e5d65cdce
commit
8caa5848d4
@ -22,7 +22,7 @@
|
||||
#include <sys/eventfd.h>
|
||||
#include <sys/sendfile.h>
|
||||
#include <sys/un.h>
|
||||
#include <netinet/tcp.h>
|
||||
#include <linux/tcp.h> // TCP_NOTSENT_LOWAT is a linux specific define
|
||||
#include <netinet/in.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
@ -1204,7 +1204,11 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv* e
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv* env, jclass clazz, jint fd, jint optval) {
|
||||
setOption(env, fd, SOL_TCP, TCP_CORK, &optval, sizeof(optval));
|
||||
setOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd, jint optval) {
|
||||
setOption(env, fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &optval, sizeof(optval));
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv* env, jclass clazz, jint fd, jint optval) {
|
||||
@ -1228,15 +1232,15 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setBroadcast(JNIEnv* e
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd, jint optval) {
|
||||
setOption(env, fd, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
|
||||
setOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpKeepIntvl(JNIEnv* env, jclass clazz, jint fd, jint optval) {
|
||||
setOption(env, fd, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
|
||||
setOption(env, fd, IPPROTO_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
|
||||
}
|
||||
|
||||
JNIEXPORT void Java_io_netty_channel_epoll_Native_setTcpKeepCnt(JNIEnv* env, jclass clazz, jint fd, jint optval) {
|
||||
setOption(env, fd, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(optval));
|
||||
setOption(env, fd, IPPROTO_TCP, TCP_KEEPCNT, &optval, sizeof(optval));
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv* env, jclass clazz, jint fd) {
|
||||
@ -1281,7 +1285,15 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIE
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv* env, jclass clazz, jint fd) {
|
||||
int optval;
|
||||
if (getOption(env, fd, SOL_TCP, TCP_CORK, &optval, sizeof(optval)) == -1) {
|
||||
if (getOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval)) == -1) {
|
||||
return -1;
|
||||
}
|
||||
return optval;
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd) {
|
||||
int optval;
|
||||
if (getOption(env, fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &optval, sizeof(optval)) == -1) {
|
||||
return -1;
|
||||
}
|
||||
return optval;
|
||||
@ -1325,7 +1337,7 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isBroadcast(JNIEnv* en
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd) {
|
||||
int optval;
|
||||
if (getOption(env, fd, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)) == -1) {
|
||||
if (getOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)) == -1) {
|
||||
return -1;
|
||||
}
|
||||
return optval;
|
||||
@ -1333,7 +1345,7 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpKeepIdle(JNIEnv*
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpKeepIntvl(JNIEnv* env, jclass clazz, jint fd) {
|
||||
int optval;
|
||||
if (getOption(env, fd, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval)) == -1) {
|
||||
if (getOption(env, fd, IPPROTO_TCP, TCP_KEEPINTVL, &optval, sizeof(optval)) == -1) {
|
||||
return -1;
|
||||
}
|
||||
return optval;
|
||||
@ -1341,7 +1353,7 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpKeepIntvl(JNIEnv
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv* env, jclass clazz, jint fd) {
|
||||
int optval;
|
||||
if (getOption(env, fd, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(optval)) == -1) {
|
||||
if (getOption(env, fd, IPPROTO_TCP, TCP_KEEPCNT, &optval, sizeof(optval)) == -1) {
|
||||
return -1;
|
||||
}
|
||||
return optval;
|
||||
@ -1349,7 +1361,7 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv*
|
||||
|
||||
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_tcpInfo0(JNIEnv* env, jclass clazz, jint fd, jintArray array) {
|
||||
struct tcp_info tcp_info;
|
||||
if (getOption(env, fd, SOL_TCP, TCP_INFO, &tcp_info, sizeof(tcp_info)) == -1) {
|
||||
if (getOption(env, fd, IPPROTO_TCP, TCP_INFO, &tcp_info, sizeof(tcp_info)) == -1) {
|
||||
return;
|
||||
}
|
||||
unsigned int cArray[32];
|
||||
|
@ -80,6 +80,7 @@ void Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv* env, jclass
|
||||
void Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv* env, jclass clazz, jint fd, jint optval);
|
||||
void Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv* env, jclass clazz, jint fd, jint optval);
|
||||
void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv* env, jclass clazz, jint fd, jint optval);
|
||||
void Java_io_netty_channel_epoll_Native_setTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd, jint optval);
|
||||
void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv* env, jclass clazz, jint fd, jint optval);
|
||||
void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv* env, jclass clazz, jint fd, jint optval);
|
||||
void Java_io_netty_channel_epoll_Native_setBroadcast(JNIEnv* env, jclass clazz, jint fd, jint optval);
|
||||
@ -93,6 +94,7 @@ jint Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv* env, jclass clazz,
|
||||
jint Java_io_netty_channel_epoll_Native_getReceiveBufferSize(JNIEnv* env, jclass clazz, jint fd);
|
||||
jint Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv* env, jclass clazz, jint fd);
|
||||
jint Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv* env, jclass clazz, jint fd);
|
||||
jint Java_io_netty_channel_epoll_Native_getTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd);
|
||||
jint Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv* env, jclass clazz, jint fd);
|
||||
jint Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv* env, jclass clazz, jint fd);
|
||||
jint Java_io_netty_channel_epoll_Native_isBroadcast(JNIEnv* env, jclass clazz, jint fd);
|
||||
|
@ -23,6 +23,7 @@ public final class EpollChannelOption {
|
||||
|
||||
public static final ChannelOption<Boolean> TCP_CORK = ChannelOption.valueOf(T, "TCP_CORK");
|
||||
public static final ChannelOption<Boolean> SO_REUSEPORT = ChannelOption.valueOf(T, "SO_REUSEPORT");
|
||||
public static final ChannelOption<Long> TCP_NOTSENT_LOWAT = ChannelOption.valueOf(T, "TCP_NOTSENT_LOWAT");
|
||||
public static final ChannelOption<Integer> TCP_KEEPIDLE = ChannelOption.valueOf(T, "TCP_KEEPIDLE");
|
||||
public static final ChannelOption<Integer> TCP_KEEPINTVL = ChannelOption.valueOf(T, "TCP_KEEPINTVL");
|
||||
public static final ChannelOption<Integer> TCP_KEEPCNT = ChannelOption.valueOf(T, "TCP_KEEPCNT");
|
||||
|
@ -16,9 +16,7 @@
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.MessageSizeEstimator;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.unix.DomainSocketChannelConfig;
|
||||
@ -28,8 +26,7 @@ import java.util.Map;
|
||||
|
||||
public final class EpollDomainSocketChannelConfig extends EpollChannelConfig
|
||||
implements DomainSocketChannelConfig {
|
||||
private volatile DomainSocketReadMode mode =
|
||||
DomainSocketReadMode.BYTES;
|
||||
private volatile DomainSocketReadMode mode = DomainSocketReadMode.BYTES;
|
||||
|
||||
EpollDomainSocketChannelConfig(AbstractEpollChannel channel) {
|
||||
super(channel);
|
||||
|
@ -27,7 +27,7 @@ import java.util.Map;
|
||||
import static io.netty.channel.ChannelOption.*;
|
||||
|
||||
public final class EpollSocketChannelConfig extends EpollChannelConfig implements SocketChannelConfig {
|
||||
|
||||
private static final long MAX_UINT32_T = 0xFFFFFFFFL;
|
||||
private final EpollSocketChannel channel;
|
||||
private volatile boolean allowHalfClosure;
|
||||
|
||||
@ -48,8 +48,8 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
|
||||
return getOptions(
|
||||
super.getOptions(),
|
||||
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS,
|
||||
ALLOW_HALF_CLOSURE, EpollChannelOption.TCP_CORK, EpollChannelOption.TCP_KEEPCNT,
|
||||
EpollChannelOption.TCP_KEEPIDLE, EpollChannelOption.TCP_KEEPINTVL);
|
||||
ALLOW_HALF_CLOSURE, EpollChannelOption.TCP_CORK, EpollChannelOption.TCP_NOTSENT_LOWAT,
|
||||
EpollChannelOption.TCP_KEEPCNT, EpollChannelOption.TCP_KEEPIDLE, EpollChannelOption.TCP_KEEPINTVL);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -82,6 +82,9 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
|
||||
if (option == EpollChannelOption.TCP_CORK) {
|
||||
return (T) Boolean.valueOf(isTcpCork());
|
||||
}
|
||||
if (option == EpollChannelOption.TCP_NOTSENT_LOWAT) {
|
||||
return (T) Long.valueOf(getTcpNotSentLowAt());
|
||||
}
|
||||
if (option == EpollChannelOption.TCP_KEEPIDLE) {
|
||||
return (T) Integer.valueOf(getTcpKeepIdle());
|
||||
}
|
||||
@ -116,6 +119,8 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
|
||||
setAllowHalfClosure((Boolean) value);
|
||||
} else if (option == EpollChannelOption.TCP_CORK) {
|
||||
setTcpCork((Boolean) value);
|
||||
} else if (option == EpollChannelOption.TCP_NOTSENT_LOWAT) {
|
||||
setTcpNotSentLowAt((Long) value);
|
||||
} else if (option == EpollChannelOption.TCP_KEEPIDLE) {
|
||||
setTcpKeepIdle((Integer) value);
|
||||
} else if (option == EpollChannelOption.TCP_KEEPCNT) {
|
||||
@ -171,6 +176,14 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
|
||||
return Native.isTcpCork(channel.fd().intValue()) == 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the {@code TCP_NOTSENT_LOWAT} option on the socket. See {@code man 7 tcp} for more details.
|
||||
* @return value is a uint32_t
|
||||
*/
|
||||
public long getTcpNotSentLowAt() {
|
||||
return Native.getTcpNotSentLowAt(channel.fd().intValue()) & MAX_UINT32_T;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the {@code TCP_KEEPIDLE} option on the socket. See {@code man 7 tcp} for more details.
|
||||
*/
|
||||
@ -242,6 +255,18 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@code TCP_NOTSENT_LOWAT} option on the socket. See {@code man 7 tcp} for more details.
|
||||
* @param tcpNotSentLowAt is a uint32_t
|
||||
*/
|
||||
public EpollSocketChannelConfig setTcpNotSentLowAt(long tcpNotSentLowAt) {
|
||||
if (tcpNotSentLowAt < 0 || tcpNotSentLowAt > MAX_UINT32_T) {
|
||||
throw new IllegalArgumentException("tcpNotSentLowAt must be a uint32_t");
|
||||
}
|
||||
Native.setTcpNotSentLowAt(channel.fd().intValue(), (int) tcpNotSentLowAt);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollSocketChannelConfig setTrafficClass(int trafficClass) {
|
||||
Native.setTrafficClass(channel.fd().intValue(), trafficClass);
|
||||
|
@ -626,6 +626,7 @@ public final class Native {
|
||||
public static native int isReusePort(int fd);
|
||||
public static native int isTcpNoDelay(int fd);
|
||||
public static native int isTcpCork(int fd);
|
||||
public static native int getTcpNotSentLowAt(int fd);
|
||||
public static native int getSoLinger(int fd);
|
||||
public static native int getTrafficClass(int fd);
|
||||
public static native int isBroadcast(int fd);
|
||||
@ -641,6 +642,7 @@ public final class Native {
|
||||
public static native void setSendBufferSize(int fd, int sendBufferSize);
|
||||
public static native void setTcpNoDelay(int fd, int tcpNoDelay);
|
||||
public static native void setTcpCork(int fd, int tcpCork);
|
||||
public static native void setTcpNotSentLowAt(int fd, int tcpNotSentLowAt);
|
||||
public static native void setSoLinger(int fd, int soLinger);
|
||||
public static native void setTrafficClass(int fd, int tcpNoDelay);
|
||||
public static native void setBroadcast(int fd, int broadcast);
|
||||
|
@ -0,0 +1,93 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Random;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class EpollSocketChannelConfigTest {
|
||||
|
||||
private static EventLoopGroup group;
|
||||
private static EpollSocketChannel ch;
|
||||
private static Random rand;
|
||||
|
||||
@BeforeClass
|
||||
public static void before() {
|
||||
rand = new Random();
|
||||
group = new EpollEventLoopGroup(1);
|
||||
Bootstrap bootstrap = new Bootstrap();
|
||||
ch = (EpollSocketChannel) bootstrap.group(group)
|
||||
.channel(EpollSocketChannel.class)
|
||||
.handler(new ChannelInboundHandlerAdapter())
|
||||
.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void after() {
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
|
||||
private long randLong(long min, long max) {
|
||||
return min + nextLong(max - min + 1);
|
||||
}
|
||||
|
||||
private long nextLong(long n) {
|
||||
long bits, val;
|
||||
do {
|
||||
bits = (rand.nextLong() << 1) >>> 1;
|
||||
val = bits % n;
|
||||
} while (bits - val + (n - 1) < 0L);
|
||||
return val;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRandomTcpNotSentLowAt() {
|
||||
final long value = randLong(0, 0xFFFFFFFFL);
|
||||
ch.config().setTcpNotSentLowAt(value);
|
||||
assertEquals(value, ch.config().getTcpNotSentLowAt());
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testInvalidHighTcpNotSentLowAt() {
|
||||
final long value = 0xFFFFFFFFL + 1;
|
||||
ch.config().setTcpNotSentLowAt(value);
|
||||
fail();
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testInvalidLowTcpNotSentLowAt() {
|
||||
final long value = -1;
|
||||
ch.config().setTcpNotSentLowAt(value);
|
||||
fail();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTcpCork() {
|
||||
ch.config().setTcpCork(false);
|
||||
assertFalse(ch.config().isTcpCork());
|
||||
ch.config().setTcpCork(true);
|
||||
assertTrue(ch.config().isTcpCork());
|
||||
}
|
||||
}
|
@ -56,6 +56,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
return list;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public List<BootstrapFactory<ServerBootstrap>> serverSocket() {
|
||||
return Arrays.asList(
|
||||
@ -76,6 +77,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public List<BootstrapFactory<Bootstrap>> clientSocket() {
|
||||
return Arrays.asList(
|
||||
@ -97,6 +99,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
@Override
|
||||
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram() {
|
||||
// Make the list of Bootstrap factories.
|
||||
@SuppressWarnings("unchecked")
|
||||
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
|
||||
new BootstrapFactory<Bootstrap>() {
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user