Enforce writeSpinCount to limit resource consumption per socket (#7478)

Motivation:
The writeSpinCount currently loops over the same buffer, gathering
write, file write, or other write operation multiple times but will
continue writing until there is nothing left or the OS doesn't accept
any data for that specific write. However if the OS keeps accepting
writes there is no way to limit how much time we spend on a specific
socket. This can lead to unfair consumption of resources dedicated to a
single socket.
We currently don't limit the amount of bytes we attempt to write per
gathering write. If there are many more bytes pending relative to the
SO_SNDBUF size we will end up building iov arrays with more elements
than can be written, which results in extra iteration, conditionals,
and book keeping.

Modifications:
- writeSpinCount should limit the number of system calls we make to
write data, instead of applying to individual write operations
- IovArray should support a maximum number of bytes
- IovArray should support composite buffers of greater than size 1024
- We should auto-scale the amount of data that we attempt to write per
gathering write operation relative to SO_SNDBUF and how much data is
successfully written
- The non-unsafe path should also support a maximum number of bytes,
and respect the IOV_MAX limit

Result:
Write resource consumption can be bounded and gathering writes have
a limit relative to the amount of data which can actually be accepted
by the socket.
This commit is contained in:
Scott Mitchell 2017-12-07 16:00:52 -08:00 committed by GitHub
parent f2b1d95164
commit b215794de3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 984 additions and 846 deletions

View File

@ -25,6 +25,7 @@
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/sendfile.h>
#include <linux/tcp.h> // TCP_NOTSENT_LOWAT is a linux specific define #include <linux/tcp.h> // TCP_NOTSENT_LOWAT is a linux specific define
#include "netty_epoll_linuxsocket.h" #include "netty_epoll_linuxsocket.h"
@ -52,6 +53,11 @@
static jclass peerCredentialsClass = NULL; static jclass peerCredentialsClass = NULL;
static jmethodID peerCredentialsMethodId = NULL; static jmethodID peerCredentialsMethodId = NULL;
static jfieldID fileChannelFieldId = NULL;
static jfieldID transferredFieldId = NULL;
static jfieldID fdFieldId = NULL;
static jfieldID fileDescriptorFieldId = NULL;
// JNI Registered Methods Begin // JNI Registered Methods Begin
static void netty_epoll_linuxsocket_setTcpCork(JNIEnv* env, jclass clazz, jint fd, jint optval) { static void netty_epoll_linuxsocket_setTcpCork(JNIEnv* env, jclass clazz, jint fd, jint optval) {
netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval)); netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
@ -285,6 +291,39 @@ static jobject netty_epoll_linuxsocket_getPeerCredentials(JNIEnv *env, jclass cl
(*env)->SetIntArrayRegion(env, gids, 0, 1, (jint*) &credentials.gid); (*env)->SetIntArrayRegion(env, gids, 0, 1, (jint*) &credentials.gid);
return (*env)->NewObject(env, peerCredentialsClass, peerCredentialsMethodId, credentials.pid, credentials.uid, gids); return (*env)->NewObject(env, peerCredentialsClass, peerCredentialsMethodId, credentials.pid, credentials.uid, gids);
} }
static jlong netty_epoll_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd, jobject fileRegion, jlong base_off, jlong off, jlong len) {
jobject fileChannel = (*env)->GetObjectField(env, fileRegion, fileChannelFieldId);
if (fileChannel == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get DefaultFileRegion.file");
return -1;
}
jobject fileDescriptor = (*env)->GetObjectField(env, fileChannel, fileDescriptorFieldId);
if (fileDescriptor == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get FileChannelImpl.fd");
return -1;
}
jint srcFd = (*env)->GetIntField(env, fileDescriptor, fdFieldId);
if (srcFd == -1) {
netty_unix_errors_throwRuntimeException(env, "failed to get FileDescriptor.fd");
return -1;
}
ssize_t res;
off_t offset = base_off + off;
int err;
do {
res = sendfile(fd, srcFd, &offset, (size_t) len);
} while (res == -1 && ((err = errno) == EINTR));
if (res < 0) {
return -err;
}
if (res > 0) {
// update the transferred field in DefaultFileRegion
(*env)->SetLongField(env, fileRegion, transferredFieldId, off + res);
}
return res;
}
// JNI Registered Methods End // JNI Registered Methods End
// JNI Method Registration Table Begin // JNI Method Registration Table Begin
@ -314,12 +353,13 @@ static const JNINativeMethod fixed_method_table[] = {
{ "isIpTransparent", "(I)I", (void *) netty_epoll_linuxsocket_isIpTransparent }, { "isIpTransparent", "(I)I", (void *) netty_epoll_linuxsocket_isIpTransparent },
{ "getTcpInfo", "(I[J)V", (void *) netty_epoll_linuxsocket_getTcpInfo }, { "getTcpInfo", "(I[J)V", (void *) netty_epoll_linuxsocket_getTcpInfo },
{ "setTcpMd5Sig", "(I[BI[B)V", (void *) netty_epoll_linuxsocket_setTcpMd5Sig } { "setTcpMd5Sig", "(I[BI[B)V", (void *) netty_epoll_linuxsocket_setTcpMd5Sig }
// "sendFile" has a dynamic signature
}; };
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);
static jint dynamicMethodsTableSize() { static jint dynamicMethodsTableSize() {
return fixed_method_table_size + 1; return fixed_method_table_size + 2; // 2 is for the dynamic method signatures.
} }
static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) { static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) {
@ -331,6 +371,13 @@ static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) {
dynamicMethod->signature = netty_unix_util_prepend("(I)L", dynamicTypeName); dynamicMethod->signature = netty_unix_util_prepend("(I)L", dynamicTypeName);
dynamicMethod->fnPtr = (void *) netty_epoll_linuxsocket_getPeerCredentials; dynamicMethod->fnPtr = (void *) netty_epoll_linuxsocket_getPeerCredentials;
free(dynamicTypeName); free(dynamicTypeName);
++dynamicMethod;
dynamicTypeName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/DefaultFileRegion;JJJ)J");
dynamicMethod->name = "sendFile";
dynamicMethod->signature = netty_unix_util_prepend("(IL", dynamicTypeName);
dynamicMethod->fnPtr = (void *) netty_epoll_linuxsocket_sendFile;
free(dynamicTypeName);
return dynamicMethods; return dynamicMethods;
} }
@ -377,6 +424,46 @@ jint netty_epoll_linuxsocket_JNI_OnLoad(JNIEnv* env, const char* packagePrefix)
return JNI_ERR; return JNI_ERR;
} }
nettyClassName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/DefaultFileRegion");
jclass fileRegionCls = (*env)->FindClass(env, nettyClassName);
free(nettyClassName);
nettyClassName = NULL;
if (fileRegionCls == NULL) {
return JNI_ERR;
}
fileChannelFieldId = (*env)->GetFieldID(env, fileRegionCls, "file", "Ljava/nio/channels/FileChannel;");
if (fileChannelFieldId == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get field ID: DefaultFileRegion.file");
return JNI_ERR;
}
transferredFieldId = (*env)->GetFieldID(env, fileRegionCls, "transferred", "J");
if (transferredFieldId == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get field ID: DefaultFileRegion.transferred");
return JNI_ERR;
}
jclass fileChannelCls = (*env)->FindClass(env, "sun/nio/ch/FileChannelImpl");
if (fileChannelCls == NULL) {
// pending exception...
return JNI_ERR;
}
fileDescriptorFieldId = (*env)->GetFieldID(env, fileChannelCls, "fd", "Ljava/io/FileDescriptor;");
if (fileDescriptorFieldId == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get field ID: FileChannelImpl.fd");
return JNI_ERR;
}
jclass fileDescriptorCls = (*env)->FindClass(env, "java/io/FileDescriptor");
if (fileDescriptorCls == NULL) {
// pending exception...
return JNI_ERR;
}
fdFieldId = (*env)->GetFieldID(env, fileDescriptorCls, "fd", "I");
if (fdFieldId == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get field ID: FileDescriptor.fd");
return JNI_ERR;
}
return NETTY_JNI_VERSION; return NETTY_JNI_VERSION;
} }

View File

@ -21,7 +21,6 @@
#include <errno.h> #include <errno.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <sys/eventfd.h> #include <sys/eventfd.h>
#include <sys/sendfile.h>
#include <sys/un.h> #include <sys/un.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
@ -66,11 +65,6 @@ struct mmsghdr {
#endif #endif
// Those are initialized in the init(...) method and cached for performance reasons // Those are initialized in the init(...) method and cached for performance reasons
jfieldID fileChannelFieldId = NULL;
jfieldID transferredFieldId = NULL;
jfieldID fdFieldId = NULL;
jfieldID fileDescriptorFieldId = NULL;
jfieldID packetAddrFieldId = NULL; jfieldID packetAddrFieldId = NULL;
jfieldID packetScopeIdFieldId = NULL; jfieldID packetScopeIdFieldId = NULL;
jfieldID packetPortFieldId = NULL; jfieldID packetPortFieldId = NULL;
@ -278,39 +272,6 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, job
return (jint) res; return (jint) res;
} }
static jlong netty_epoll_native_sendfile0(JNIEnv* env, jclass clazz, jint fd, jobject fileRegion, jlong base_off, jlong off, jlong len) {
jobject fileChannel = (*env)->GetObjectField(env, fileRegion, fileChannelFieldId);
if (fileChannel == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get DefaultFileRegion.file");
return -1;
}
jobject fileDescriptor = (*env)->GetObjectField(env, fileChannel, fileDescriptorFieldId);
if (fileDescriptor == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get FileChannelImpl.fd");
return -1;
}
jint srcFd = (*env)->GetIntField(env, fileDescriptor, fdFieldId);
if (srcFd == -1) {
netty_unix_errors_throwRuntimeException(env, "failed to get FileDescriptor.fd");
return -1;
}
ssize_t res;
off_t offset = base_off + off;
int err;
do {
res = sendfile(fd, srcFd, &offset, (size_t) len);
} while (res == -1 && ((err = errno) == EINTR));
if (res < 0) {
return -err;
}
if (res > 0) {
// update the transferred field in DefaultFileRegion
(*env)->SetLongField(env, fileRegion, transferredFieldId, off + res);
}
return res;
}
static jstring netty_epoll_native_kernelVersion(JNIEnv* env, jclass clazz) { static jstring netty_epoll_native_kernelVersion(JNIEnv* env, jclass clazz) {
struct utsname name; struct utsname name;
@ -425,7 +386,6 @@ static const JNINativeMethod fixed_method_table[] = {
{ "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 }, { "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 },
{ "epollCtlDel0", "(II)I", (void *) netty_epoll_native_epollCtlDel0 }, { "epollCtlDel0", "(II)I", (void *) netty_epoll_native_epollCtlDel0 },
// "sendmmsg0" has a dynamic signature // "sendmmsg0" has a dynamic signature
// "sendFile0" has a dynamic signature
{ "sizeofEpollEvent", "()I", (void *) netty_epoll_native_sizeofEpollEvent }, { "sizeofEpollEvent", "()I", (void *) netty_epoll_native_sizeofEpollEvent },
{ "offsetofEpollData", "()I", (void *) netty_epoll_native_offsetofEpollData }, { "offsetofEpollData", "()I", (void *) netty_epoll_native_offsetofEpollData },
{ "splice0", "(IJIJJ)I", (void *) netty_epoll_native_splice0 } { "splice0", "(IJIJJ)I", (void *) netty_epoll_native_splice0 }
@ -433,7 +393,7 @@ static const JNINativeMethod fixed_method_table[] = {
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);
static jint dynamicMethodsTableSize() { static jint dynamicMethodsTableSize() {
return fixed_method_table_size + 2; return fixed_method_table_size + 1; // 1 is for the dynamic method signatures.
} }
static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) { static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) {
@ -445,13 +405,6 @@ static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) {
dynamicMethod->signature = netty_unix_util_prepend("(I[L", dynamicTypeName); dynamicMethod->signature = netty_unix_util_prepend("(I[L", dynamicTypeName);
dynamicMethod->fnPtr = (void *) netty_epoll_native_sendmmsg0; dynamicMethod->fnPtr = (void *) netty_epoll_native_sendmmsg0;
free(dynamicTypeName); free(dynamicTypeName);
++dynamicMethod;
dynamicTypeName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/DefaultFileRegion;JJJ)J");
dynamicMethod->name = "sendfile0";
dynamicMethod->signature = netty_unix_util_prepend("(IL", dynamicTypeName);
dynamicMethod->fnPtr = (void *) netty_epoll_native_sendfile0;
free(dynamicTypeName);
return dynamicMethods; return dynamicMethods;
} }
@ -504,47 +457,7 @@ static jint netty_epoll_native_JNI_OnLoad(JNIEnv* env, const char* packagePrefix
} }
// Initialize this module // Initialize this module
char* nettyClassName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/DefaultFileRegion"); char* nettyClassName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/epoll/NativeDatagramPacketArray$NativeDatagramPacket");
jclass fileRegionCls = (*env)->FindClass(env, nettyClassName);
free(nettyClassName);
nettyClassName = NULL;
if (fileRegionCls == NULL) {
return JNI_ERR;
}
fileChannelFieldId = (*env)->GetFieldID(env, fileRegionCls, "file", "Ljava/nio/channels/FileChannel;");
if (fileChannelFieldId == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get field ID: DefaultFileRegion.file");
return JNI_ERR;
}
transferredFieldId = (*env)->GetFieldID(env, fileRegionCls, "transferred", "J");
if (transferredFieldId == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get field ID: DefaultFileRegion.transferred");
return JNI_ERR;
}
jclass fileChannelCls = (*env)->FindClass(env, "sun/nio/ch/FileChannelImpl");
if (fileChannelCls == NULL) {
// pending exception...
return JNI_ERR;
}
fileDescriptorFieldId = (*env)->GetFieldID(env, fileChannelCls, "fd", "Ljava/io/FileDescriptor;");
if (fileDescriptorFieldId == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get field ID: FileChannelImpl.fd");
return JNI_ERR;
}
jclass fileDescriptorCls = (*env)->FindClass(env, "java/io/FileDescriptor");
if (fileDescriptorCls == NULL) {
// pending exception...
return JNI_ERR;
}
fdFieldId = (*env)->GetFieldID(env, fileDescriptorCls, "fd", "I");
if (fdFieldId == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get field ID: FileDescriptor.fd");
return JNI_ERR;
}
nettyClassName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/epoll/NativeDatagramPacketArray$NativeDatagramPacket");
jclass nativeDatagramPacketCls = (*env)->FindClass(env, nettyClassName); jclass nativeDatagramPacketCls = (*env)->FindClass(env, nettyClassName);
free(nettyClassName); free(nettyClassName);
nettyClassName = NULL; nettyClassName = NULL;

View File

@ -26,6 +26,7 @@ import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException; import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
@ -50,6 +51,7 @@ import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr; import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
@ -353,52 +355,24 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
return localReadAmount; return localReadAmount;
} }
protected final int doWriteBytes(ByteBuf buf, int writeSpinCount) throws Exception { protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
int readableBytes = buf.readableBytes();
int writtenBytes = 0;
if (buf.hasMemoryAddress()) { if (buf.hasMemoryAddress()) {
long memoryAddress = buf.memoryAddress(); int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
int readerIndex = buf.readerIndex(); if (localFlushedAmount > 0) {
int writerIndex = buf.writerIndex(); in.removeBytes(localFlushedAmount);
for (int i = writeSpinCount; i > 0; --i) { return 1;
int localFlushedAmount = socket.writeAddress(memoryAddress, readerIndex, writerIndex);
if (localFlushedAmount > 0) {
writtenBytes += localFlushedAmount;
if (writtenBytes == readableBytes) {
return writtenBytes;
}
readerIndex += localFlushedAmount;
} else {
break;
}
} }
} else { } else {
ByteBuffer nioBuf; final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
if (buf.nioBufferCount() == 1) { buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
nioBuf = buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()); int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
} else { if (localFlushedAmount > 0) {
nioBuf = buf.nioBuffer(); nioBuf.position(nioBuf.position() + localFlushedAmount);
} in.removeBytes(localFlushedAmount);
for (int i = writeSpinCount; i > 0; --i) { return 1;
int pos = nioBuf.position();
int limit = nioBuf.limit();
int localFlushedAmount = socket.write(nioBuf, pos, limit);
if (localFlushedAmount > 0) {
nioBuf.position(pos + localFlushedAmount);
writtenBytes += localFlushedAmount;
if (writtenBytes == readableBytes) {
return writtenBytes;
}
} else {
break;
}
} }
} }
if (writtenBytes < readableBytes) { return WRITE_STATUS_SNDBUF_FULL;
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
}
return writtenBytes;
} }
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {

View File

@ -29,6 +29,7 @@ import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.internal.ChannelUtils;
import io.netty.channel.socket.DuplexChannel; import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.IovArray; import io.netty.channel.unix.IovArray;
@ -49,6 +50,8 @@ import java.nio.channels.WritableByteChannel;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
import static io.netty.channel.unix.FileDescriptor.pipe; import static io.netty.channel.unix.FileDescriptor.pipe;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
@ -67,7 +70,12 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
private static final ClosedChannelException FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION = private static final ClosedChannelException FAIL_SPLICE_IF_CLOSED_CLOSED_CHANNEL_EXCEPTION =
ThrowableUtil.unknownStackTrace(new ClosedChannelException(), ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
AbstractEpollStreamChannel.class, "failSpliceIfClosed(...)"); AbstractEpollStreamChannel.class, "failSpliceIfClosed(...)");
private final Runnable flushTask = new Runnable() {
@Override
public void run() {
flush();
}
};
private Queue<SpliceInTask> spliceQueue; private Queue<SpliceInTask> spliceQueue;
// Lazy init these if we need to splice(...) // Lazy init these if we need to splice(...)
@ -241,291 +249,282 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
/** /**
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param buf the {@link ByteBuf} from which the bytes should be written * @param in the collection which contains objects to write.
* @param buf the {@link ByteBuf} from which the bytes should be written
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
*/ */
private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf, int writeSpinCount) throws Exception { private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
int readableBytes = buf.readableBytes(); int readableBytes = buf.readableBytes();
if (readableBytes == 0) { if (readableBytes == 0) {
in.remove(); in.remove();
return true; return 0;
} }
if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) { if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
int writtenBytes = doWriteBytes(buf, writeSpinCount); return doWriteBytes(in, buf);
in.removeBytes(writtenBytes);
return writtenBytes == readableBytes;
} else { } else {
ByteBuffer[] nioBuffers = buf.nioBuffers(); ByteBuffer[] nioBuffers = buf.nioBuffers();
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes, writeSpinCount); return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
config().getMaxBytesPerGatheringWrite());
} }
} }
private boolean writeBytesMultiple( private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
ChannelOutboundBuffer in, IovArray array, int writeSpinCount) throws IOException { // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
// SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
long expectedWrittenBytes = array.size(); // make a best effort to adjust as OS behavior changes.
final long initialExpectedWrittenBytes = expectedWrittenBytes; if (attempted == written) {
if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
int cnt = array.count(); config().setMaxBytesPerGatheringWrite(attempted << 1);
}
} else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
config().setMaxBytesPerGatheringWrite(attempted >>> 1);
}
}
/**
* Write multiple bytes via {@link IovArray}.
* @param in the collection which contains objects to write.
* @param array The array which contains the content to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
* @throws IOException If an I/O exception occurs during write.
*/
private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
final long expectedWrittenBytes = array.size();
assert expectedWrittenBytes != 0; assert expectedWrittenBytes != 0;
final int cnt = array.count();
assert cnt != 0; assert cnt != 0;
boolean done = false; final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
int offset = 0; if (localWrittenBytes > 0) {
int end = offset + cnt; adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
for (int i = writeSpinCount; i > 0; --i) { in.removeBytes(localWrittenBytes);
long localWrittenBytes = socket.writevAddresses(array.memoryAddress(offset), cnt); return 1;
if (localWrittenBytes == 0) {
break;
}
expectedWrittenBytes -= localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break;
}
do {
long bytes = array.processWritten(offset, localWrittenBytes);
if (bytes == -1) {
// incomplete write
break;
} else {
offset++;
cnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
} }
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); return WRITE_STATUS_SNDBUF_FULL;
return done;
} }
private boolean writeBytesMultiple( /**
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, * Write multiple bytes via {@link ByteBuffer} array.
int nioBufferCnt, long expectedWrittenBytes, int writeSpinCount) throws IOException { * @param in the collection which contains objects to write.
* @param nioBuffers The buffers to write.
* @param nioBufferCnt The number of buffers to write.
* @param expectedWrittenBytes The number of bytes we expect to write.
* @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
* @throws IOException If an I/O exception occurs during write.
*/
private int writeBytesMultiple(
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
long maxBytesPerGatheringWrite) throws IOException {
assert expectedWrittenBytes != 0; assert expectedWrittenBytes != 0;
final long initialExpectedWrittenBytes = expectedWrittenBytes; if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
expectedWrittenBytes = maxBytesPerGatheringWrite;
boolean done = false;
int offset = 0;
int end = offset + nioBufferCnt;
for (int i = writeSpinCount; i > 0; --i) {
long localWrittenBytes = socket.writev(nioBuffers, offset, nioBufferCnt);
if (localWrittenBytes == 0) {
break;
}
expectedWrittenBytes -= localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break;
}
do {
ByteBuffer buffer = nioBuffers[offset];
int pos = buffer.position();
int bytes = buffer.limit() - pos;
if (bytes > localWrittenBytes) {
buffer.position(pos + (int) localWrittenBytes);
// incomplete write
break;
} else {
offset++;
nioBufferCnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
} }
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
return done; if (localWrittenBytes > 0) {
adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
return 1;
}
return WRITE_STATUS_SNDBUF_FULL;
} }
/** /**
* Write a {@link DefaultFileRegion} * Write a {@link DefaultFileRegion}
* * @param in the collection which contains objects to write.
* @param region the {@link DefaultFileRegion} from which the bytes should be written * @param region the {@link DefaultFileRegion} from which the bytes should be written
* @return amount the amount of written bytes * @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
*/ */
private boolean writeDefaultFileRegion( private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
ChannelOutboundBuffer in, DefaultFileRegion region, int writeSpinCount) throws Exception {
final long regionCount = region.count(); final long regionCount = region.count();
if (region.transferred() >= regionCount) { if (region.transferred() >= regionCount) {
in.remove(); in.remove();
return true; return 0;
}
final long baseOffset = region.position();
boolean done = false;
long flushedAmount = 0;
for (int i = writeSpinCount; i > 0; --i) {
final long offset = region.transferred();
final long localFlushedAmount =
Native.sendfile(socket.intValue(), region, baseOffset, offset, regionCount - offset);
if (localFlushedAmount == 0) {
break;
}
flushedAmount += localFlushedAmount;
if (region.transferred() >= regionCount) {
done = true;
break;
}
} }
final long offset = region.transferred();
final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
if (flushedAmount > 0) { if (flushedAmount > 0) {
in.progress(flushedAmount); in.progress(flushedAmount);
if (region.transferred() >= regionCount) {
in.remove();
}
return 1;
} }
return WRITE_STATUS_SNDBUF_FULL;
if (done) {
in.remove();
}
return done;
} }
private boolean writeFileRegion( /**
ChannelOutboundBuffer in, FileRegion region, final int writeSpinCount) throws Exception { * Write a {@link FileRegion}
* @param in the collection which contains objects to write.
* @param region the {@link FileRegion} from which the bytes should be written
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
*/
private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
if (region.transferred() >= region.count()) { if (region.transferred() >= region.count()) {
in.remove(); in.remove();
return true; return 0;
} }
boolean done = false;
long flushedAmount = 0;
if (byteChannel == null) { if (byteChannel == null) {
byteChannel = new EpollSocketWritableByteChannel(); byteChannel = new EpollSocketWritableByteChannel();
} }
for (int i = writeSpinCount; i > 0; --i) { final long flushedAmount = region.transferTo(byteChannel, region.transferred());
final long localFlushedAmount = region.transferTo(byteChannel, region.transferred());
if (localFlushedAmount == 0) {
break;
}
flushedAmount += localFlushedAmount;
if (region.transferred() >= region.count()) {
done = true;
break;
}
}
if (flushedAmount > 0) { if (flushedAmount > 0) {
in.progress(flushedAmount); in.progress(flushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
} }
return WRITE_STATUS_SNDBUF_FULL;
if (done) {
in.remove();
}
return done;
} }
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount(); int writeSpinCount = config().getWriteSpinCount();
for (;;) { do {
final int msgCount = in.size(); final int msgCount = in.size();
// Do gathering write if the outbound buffer entries start with more than one ByteBuf.
if (msgCount == 0) { if (msgCount > 1 && in.current() instanceof ByteBuf) {
writeSpinCount -= doWriteMultiple(in);
} else if (msgCount == 0) {
// Wrote all messages. // Wrote all messages.
clearFlag(Native.EPOLLOUT); clearFlag(Native.EPOLLOUT);
// Return here so we not set the EPOLLOUT flag. // Return here so we not set the EPOLLOUT flag.
return; return;
} else { // msgCount == 1
writeSpinCount -= doWriteSingle(in);
} }
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf. // We do not break the loop here even if the outbound buffer was flushed completely,
if (msgCount > 1 && in.current() instanceof ByteBuf) { // because a user might have triggered another write and flush when we notify his or her
if (!doWriteMultiple(in, writeSpinCount)) { // listeners.
// Break the loop and so set EPOLLOUT flag. } while (writeSpinCount > 0);
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely, if (writeSpinCount == 0) {
// because a user might have triggered another write and flush when we notify his or her // We used our writeSpin quantum, and should try to write again later.
// listeners. eventLoop().execute(flushTask);
} else { // msgCount == 1 } else {
if (!doWriteSingle(in, writeSpinCount)) { // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
// Break the loop and so set EPOLLOUT flag. // when it can accept more data.
break; setFlag(Native.EPOLLOUT);
}
}
} }
// Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
// when it can accept more data.
setFlag(Native.EPOLLOUT);
} }
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception { /**
* Attempt to write a single object.
* @param in the collection which contains objects to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
* @throws Exception If an I/O error occurs.
*/
protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region. // The outbound buffer contains only one message or it contains a file region.
Object msg = in.current(); Object msg = in.current();
if (msg instanceof ByteBuf) { if (msg instanceof ByteBuf) {
if (!writeBytes(in, (ByteBuf) msg, writeSpinCount)) { return writeBytes(in, (ByteBuf) msg);
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else if (msg instanceof DefaultFileRegion) { } else if (msg instanceof DefaultFileRegion) {
if (!writeDefaultFileRegion(in, (DefaultFileRegion) msg, writeSpinCount)) { return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else if (msg instanceof FileRegion) { } else if (msg instanceof FileRegion) {
if (!writeFileRegion(in, (FileRegion) msg, writeSpinCount)) { return writeFileRegion(in, (FileRegion) msg);
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else if (msg instanceof SpliceOutTask) { } else if (msg instanceof SpliceOutTask) {
if (!((SpliceOutTask) msg).spliceOut()) { if (!((SpliceOutTask) msg).spliceOut()) {
return false; return WRITE_STATUS_SNDBUF_FULL;
} }
in.remove(); in.remove();
return 1;
} else { } else {
// Should never reach here. // Should never reach here.
throw new Error(); throw new Error();
} }
return true;
} }
private boolean doWriteMultiple(ChannelOutboundBuffer in, int writeSpinCount) throws Exception { /**
* Attempt to write multiple {@link ByteBuf} objects.
* @param in the collection which contains objects to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
* @throws Exception If an I/O error occurs.
*/
private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
if (PlatformDependent.hasUnsafe()) { if (PlatformDependent.hasUnsafe()) {
// this means we can cast to IovArray and write the IovArray directly.
IovArray array = ((EpollEventLoop) eventLoop()).cleanArray(); IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
array.maxBytes(maxBytesPerGatheringWrite);
in.forEachFlushedMessage(array); in.forEachFlushedMessage(array);
int cnt = array.count(); if (array.count() >= 1) {
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially. // TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, array, writeSpinCount)) { return writeBytesMultiple(in, array);
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
} }
} else { } else {
ByteBuffer[] buffers = in.nioBuffers(); ByteBuffer[] buffers = in.nioBuffers();
int cnt = in.nioBufferCount(); int cnt = in.nioBufferCount();
if (cnt >= 1) { if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially. // TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), writeSpinCount)) { return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite);
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
} }
} }
// cnt == 0, which means the outbound buffer contained empty buffers only.
return true; in.removeBytes(0);
return 0;
} }
@Override @Override

View File

@ -22,11 +22,15 @@ import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator; import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.WriteBufferWaterMark;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import static io.netty.channel.unix.Limits.SSIZE_MAX;
public class EpollChannelConfig extends DefaultChannelConfig { public class EpollChannelConfig extends DefaultChannelConfig {
final AbstractEpollChannel channel; final AbstractEpollChannel channel;
private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;
EpollChannelConfig(AbstractEpollChannel channel) { EpollChannelConfig(AbstractEpollChannel channel) {
super(channel); super(channel);
@ -177,4 +181,12 @@ public class EpollChannelConfig extends DefaultChannelConfig {
protected final void autoReadCleared() { protected final void autoReadCleared() {
channel.clearEpollIn(); channel.clearEpollIn();
} }
final void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) {
this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite;
}
final long getMaxBytesPerGatheringWrite() {
return maxBytesPerGatheringWrite;
}
} }

View File

@ -103,14 +103,14 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
} }
@Override @Override
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception { protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
Object msg = in.current(); Object msg = in.current();
if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) { if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
// File descriptor was written, so remove it. // File descriptor was written, so remove it.
in.remove(); in.remove();
return true; return 1;
} }
return super.doWriteSingle(in, writeSpinCount); return super.doWriteSingle(in);
} }
@Override @Override

View File

@ -51,6 +51,7 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
if (PlatformDependent.canEnableTcpNoDelayByDefault()) { if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
setTcpNoDelay(true); setTcpNoDelay(true);
} }
calculateMaxBytesPerGatheringWrite();
} }
@Override @Override
@ -340,6 +341,7 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
public EpollSocketChannelConfig setSendBufferSize(int sendBufferSize) { public EpollSocketChannelConfig setSendBufferSize(int sendBufferSize) {
try { try {
channel.socket.setSendBufferSize(sendBufferSize); channel.socket.setSendBufferSize(sendBufferSize);
calculateMaxBytesPerGatheringWrite();
return this; return this;
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
@ -632,4 +634,12 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
super.setEpollMode(mode); super.setEpollMode(mode);
return this; return this;
} }
private void calculateMaxBytesPerGatheringWrite() {
// Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
int newSendBufferSize = getSendBufferSize() << 1;
if (newSendBufferSize > 0) {
setMaxBytesPerGatheringWrite(getSendBufferSize() << 1);
}
}
} }

View File

@ -15,18 +15,30 @@
*/ */
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.unix.Errors.NativeIoException;
import io.netty.channel.unix.NativeInetAddress; import io.netty.channel.unix.NativeInetAddress;
import io.netty.channel.unix.PeerCredentials; import io.netty.channel.unix.PeerCredentials;
import io.netty.channel.unix.Socket; import io.netty.channel.unix.Socket;
import io.netty.util.internal.ThrowableUtil;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.nio.channels.ClosedChannelException;
import static io.netty.channel.unix.Errors.ERRNO_EPIPE_NEGATIVE;
import static io.netty.channel.unix.Errors.ioResult;
import static io.netty.channel.unix.Errors.newConnectionResetException;
/** /**
* A socket which provides access Linux native methods. * A socket which provides access Linux native methods.
*/ */
final class LinuxSocket extends Socket { final class LinuxSocket extends Socket {
private static final long MAX_UINT32_T = 0xFFFFFFFFL; private static final long MAX_UINT32_T = 0xFFFFFFFFL;
private static final NativeIoException SENDFILE_CONNECTION_RESET_EXCEPTION =
newConnectionResetException("syscall:sendfile(...)", ERRNO_EPIPE_NEGATIVE);
private static final ClosedChannelException SENDFILE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), Native.class, "sendfile(...)");
public LinuxSocket(int fd) { public LinuxSocket(int fd) {
super(fd); super(fd);
@ -140,6 +152,18 @@ final class LinuxSocket extends Socket {
return getPeerCredentials(intValue()); return getPeerCredentials(intValue());
} }
long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException {
// Open the file-region as it may be created via the lazy constructor. This is needed as we directly access
// the FileChannel field via JNI.
src.open();
long res = sendFile(intValue(), src, baseOffset, offset, length);
if (res >= 0) {
return res;
}
return ioResult("sendfile", (int) res, SENDFILE_CONNECTION_RESET_EXCEPTION, SENDFILE_CLOSED_CHANNEL_EXCEPTION);
}
public static LinuxSocket newSocketStream() { public static LinuxSocket newSocketStream() {
return new LinuxSocket(newSocketStream0()); return new LinuxSocket(newSocketStream0());
} }
@ -152,6 +176,9 @@ final class LinuxSocket extends Socket {
return new LinuxSocket(newSocketDomain0()); return new LinuxSocket(newSocketDomain0());
} }
private static native long sendFile(int socketFd, DefaultFileRegion src, long baseOffset,
long offset, long length) throws IOException;
private static native int getTcpDeferAccept(int fd) throws IOException; private static native int getTcpDeferAccept(int fd) throws IOException;
private static native int isTcpQuickAck(int fd) throws IOException; private static native int isTcpQuickAck(int fd) throws IOException;
private static native int isTcpCork(int fd) throws IOException; private static native int isTcpCork(int fd) throws IOException;

View File

@ -15,13 +15,12 @@
*/ */
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.unix.Errors.NativeIoException; import io.netty.channel.unix.Errors.NativeIoException;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket; import io.netty.channel.unix.Socket;
import io.netty.util.internal.NativeLibraryLoader; import io.netty.util.internal.NativeLibraryLoader;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
import io.netty.channel.unix.FileDescriptor;
import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -76,19 +75,14 @@ public final class Native {
public static final int TCP_MD5SIG_MAXKEYLEN = tcpMd5SigMaxKeyLen(); public static final int TCP_MD5SIG_MAXKEYLEN = tcpMd5SigMaxKeyLen();
public static final String KERNEL_VERSION = kernelVersion(); public static final String KERNEL_VERSION = kernelVersion();
private static final NativeIoException SENDFILE_CONNECTION_RESET_EXCEPTION;
private static final NativeIoException SENDMMSG_CONNECTION_RESET_EXCEPTION; private static final NativeIoException SENDMMSG_CONNECTION_RESET_EXCEPTION;
private static final NativeIoException SPLICE_CONNECTION_RESET_EXCEPTION; private static final NativeIoException SPLICE_CONNECTION_RESET_EXCEPTION;
private static final ClosedChannelException SENDFILE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), Native.class, "sendfile(...)");
private static final ClosedChannelException SENDMMSG_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( private static final ClosedChannelException SENDMMSG_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), Native.class, "sendmmsg(...)"); new ClosedChannelException(), Native.class, "sendmmsg(...)");
private static final ClosedChannelException SPLICE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( private static final ClosedChannelException SPLICE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), Native.class, "splice(...)"); new ClosedChannelException(), Native.class, "splice(...)");
static { static {
SENDFILE_CONNECTION_RESET_EXCEPTION = newConnectionResetException("syscall:sendfile(...)",
ERRNO_EPIPE_NEGATIVE);
SENDMMSG_CONNECTION_RESET_EXCEPTION = newConnectionResetException("syscall:sendmmsg(...)", SENDMMSG_CONNECTION_RESET_EXCEPTION = newConnectionResetException("syscall:sendmmsg(...)",
ERRNO_EPIPE_NEGATIVE); ERRNO_EPIPE_NEGATIVE);
SPLICE_CONNECTION_RESET_EXCEPTION = newConnectionResetException("syscall:splice(...)", SPLICE_CONNECTION_RESET_EXCEPTION = newConnectionResetException("syscall:splice(...)",
@ -161,22 +155,6 @@ public final class Native {
private static native int splice0(int fd, long offIn, int fdOut, long offOut, long len); private static native int splice0(int fd, long offIn, int fdOut, long offOut, long len);
public static long sendfile(
int dest, DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException {
// Open the file-region as it may be created via the lazy constructor. This is needed as we directly access
// the FileChannel field directly via JNI
src.open();
long res = sendfile0(dest, src, baseOffset, offset, length);
if (res >= 0) {
return res;
}
return ioResult("sendfile", (int) res, SENDFILE_CONNECTION_RESET_EXCEPTION, SENDFILE_CLOSED_CHANNEL_EXCEPTION);
}
private static native long sendfile0(
int dest, DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException;
public static int sendmmsg( public static int sendmmsg(
int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len) throws IOException { int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len) throws IOException {
int res = sendmmsg0(fd, msgs, offset, len); int res = sendmmsg0(fd, msgs, offset, len);

View File

@ -26,6 +26,7 @@ import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException; import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
@ -47,6 +48,7 @@ import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr; import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
@ -299,52 +301,24 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
return localReadAmount; return localReadAmount;
} }
protected final int doWriteBytes(ByteBuf buf, int writeSpinCount) throws Exception { protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
int readableBytes = buf.readableBytes();
int writtenBytes = 0;
if (buf.hasMemoryAddress()) { if (buf.hasMemoryAddress()) {
long memoryAddress = buf.memoryAddress(); int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
int readerIndex = buf.readerIndex(); if (localFlushedAmount > 0) {
int writerIndex = buf.writerIndex(); in.removeBytes(localFlushedAmount);
for (int i = writeSpinCount; i > 0; --i) { return 1;
int localFlushedAmount = socket.writeAddress(memoryAddress, readerIndex, writerIndex);
if (localFlushedAmount > 0) {
writtenBytes += localFlushedAmount;
if (writtenBytes == readableBytes) {
return writtenBytes;
}
readerIndex += localFlushedAmount;
} else {
break;
}
} }
} else { } else {
ByteBuffer nioBuf; final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
if (buf.nioBufferCount() == 1) { buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
nioBuf = buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()); int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
} else { if (localFlushedAmount > 0) {
nioBuf = buf.nioBuffer(); nioBuf.position(nioBuf.position() + localFlushedAmount);
} in.removeBytes(localFlushedAmount);
for (int i = writeSpinCount; i > 0; --i) { return 1;
int pos = nioBuf.position();
int limit = nioBuf.limit();
int localFlushedAmount = socket.write(nioBuf, pos, limit);
if (localFlushedAmount > 0) {
nioBuf.position(pos + localFlushedAmount);
writtenBytes += localFlushedAmount;
if (writtenBytes == readableBytes) {
return writtenBytes;
}
} else {
break;
}
} }
} }
if (writtenBytes < readableBytes) { return WRITE_STATUS_SNDBUF_FULL;
// Returned EAGAIN need to wait until we are allowed to write again.
writeFilter(true);
}
return writtenBytes;
} }
final boolean shouldBreakReadReady(ChannelConfig config) { final boolean shouldBreakReadReady(ChannelConfig config) {

View File

@ -28,6 +28,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultFileRegion; import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import io.netty.channel.internal.ChannelUtils;
import io.netty.channel.socket.DuplexChannel; import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.unix.IovArray; import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.SocketWritableByteChannel; import io.netty.channel.unix.SocketWritableByteChannel;
@ -44,6 +45,9 @@ import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
@UnstableApi @UnstableApi
public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel { public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractKQueueStreamChannel.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractKQueueStreamChannel.class);
@ -51,8 +55,13 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
private static final String EXPECTED_TYPES = private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(DefaultFileRegion.class) + ')'; StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
private WritableByteChannel byteChannel; private WritableByteChannel byteChannel;
private final Runnable flushTask = new Runnable() {
@Override
public void run() {
flush();
}
};
AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) { AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) {
super(parent, fd, active, true); super(parent, fd, active, true);
@ -78,285 +87,276 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
/** /**
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param buf the {@link ByteBuf} from which the bytes should be written * @param in the collection which contains objects to write.
* @param buf the {@link ByteBuf} from which the bytes should be written
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
* data was accepted</li>
* </ul>
*/ */
private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf, int writeSpinCount) throws Exception { private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
int readableBytes = buf.readableBytes(); int readableBytes = buf.readableBytes();
if (readableBytes == 0) { if (readableBytes == 0) {
in.remove(); in.remove();
return true; return 0;
} }
if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) { if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
int writtenBytes = doWriteBytes(buf, writeSpinCount); return doWriteBytes(in, buf);
in.removeBytes(writtenBytes);
return writtenBytes == readableBytes;
} else { } else {
ByteBuffer[] nioBuffers = buf.nioBuffers(); ByteBuffer[] nioBuffers = buf.nioBuffers();
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes, writeSpinCount); return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
config().getMaxBytesPerGatheringWrite());
} }
} }
private boolean writeBytesMultiple( private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
ChannelOutboundBuffer in, IovArray array, int writeSpinCount) throws IOException { // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
// SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
long expectedWrittenBytes = array.size(); // make a best effort to adjust as OS behavior changes.
final long initialExpectedWrittenBytes = expectedWrittenBytes; if (attempted == written) {
if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
int cnt = array.count(); config().setMaxBytesPerGatheringWrite(attempted << 1);
}
} else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
config().setMaxBytesPerGatheringWrite(attempted >>> 1);
}
}
/**
* Write multiple bytes via {@link IovArray}.
* @param in the collection which contains objects to write.
* @param array The array which contains the content to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
* @throws IOException If an I/O exception occurs during write.
*/
private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
final long expectedWrittenBytes = array.size();
assert expectedWrittenBytes != 0; assert expectedWrittenBytes != 0;
final int cnt = array.count();
assert cnt != 0; assert cnt != 0;
boolean done = false; final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
int offset = 0; if (localWrittenBytes > 0) {
int end = offset + cnt; adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
for (int i = writeSpinCount; i > 0; --i) { in.removeBytes(localWrittenBytes);
long localWrittenBytes = socket.writevAddresses(array.memoryAddress(offset), cnt); return 1;
if (localWrittenBytes == 0) {
break;
}
expectedWrittenBytes -= localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break;
}
do {
long bytes = array.processWritten(offset, localWrittenBytes);
if (bytes == -1) {
// incomplete write
break;
} else {
offset++;
cnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
} }
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); return WRITE_STATUS_SNDBUF_FULL;
return done;
} }
private boolean writeBytesMultiple( /**
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, * Write multiple bytes via {@link ByteBuffer} array.
int nioBufferCnt, long expectedWrittenBytes, int writeSpinCount) throws IOException { * @param in the collection which contains objects to write.
* @param nioBuffers The buffers to write.
* @param nioBufferCnt The number of buffers to write.
* @param expectedWrittenBytes The number of bytes we expect to write.
* @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
* @throws IOException If an I/O exception occurs during write.
*/
private int writeBytesMultiple(
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
long maxBytesPerGatheringWrite) throws IOException {
assert expectedWrittenBytes != 0; assert expectedWrittenBytes != 0;
final long initialExpectedWrittenBytes = expectedWrittenBytes; if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
expectedWrittenBytes = maxBytesPerGatheringWrite;
boolean done = false;
int offset = 0;
int end = offset + nioBufferCnt;
for (int i = writeSpinCount; i > 0; --i) {
long localWrittenBytes = socket.writev(nioBuffers, offset, nioBufferCnt);
if (localWrittenBytes == 0) {
break;
}
expectedWrittenBytes -= localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break;
}
do {
ByteBuffer buffer = nioBuffers[offset];
int pos = buffer.position();
int bytes = buffer.limit() - pos;
if (bytes > localWrittenBytes) {
buffer.position(pos + (int) localWrittenBytes);
// incomplete write
break;
} else {
offset++;
nioBufferCnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
} }
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
return done; if (localWrittenBytes > 0) {
adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
return 1;
}
return WRITE_STATUS_SNDBUF_FULL;
} }
/** /**
* Write a {@link DefaultFileRegion} * Write a {@link DefaultFileRegion}
* * @param in the collection which contains objects to write.
* @param region the {@link DefaultFileRegion} from which the bytes should be written * @param region the {@link DefaultFileRegion} from which the bytes should be written
* @return amount the amount of written bytes * @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
*/ */
private boolean writeDefaultFileRegion( private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
ChannelOutboundBuffer in, DefaultFileRegion region, int writeSpinCount) throws Exception {
final long regionCount = region.count(); final long regionCount = region.count();
if (region.transferred() >= regionCount) { if (region.transferred() >= regionCount) {
in.remove(); in.remove();
return true; return 0;
}
final long baseOffset = region.position();
boolean done = false;
long flushedAmount = 0;
for (int i = writeSpinCount; i > 0; --i) {
final long offset = region.transferred();
final long localFlushedAmount = socket.sendFile(region, baseOffset, offset, regionCount - offset);
if (localFlushedAmount == 0) {
break;
}
flushedAmount += localFlushedAmount;
if (region.transferred() >= regionCount) {
done = true;
break;
}
} }
final long offset = region.transferred();
final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
if (flushedAmount > 0) { if (flushedAmount > 0) {
in.progress(flushedAmount); in.progress(flushedAmount);
if (region.transferred() >= regionCount) {
in.remove();
}
return 1;
} }
return WRITE_STATUS_SNDBUF_FULL;
if (done) {
in.remove();
}
return done;
} }
private boolean writeFileRegion( /**
ChannelOutboundBuffer in, FileRegion region, final int writeSpinCount) throws Exception { * Write a {@link FileRegion}
* @param in the collection which contains objects to write.
* @param region the {@link FileRegion} from which the bytes should be written
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
* data was accepted</li>
* </ul>
*/
private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
if (region.transferred() >= region.count()) { if (region.transferred() >= region.count()) {
in.remove(); in.remove();
return true; return 0;
} }
boolean done = false;
long flushedAmount = 0;
if (byteChannel == null) { if (byteChannel == null) {
byteChannel = new KQueueSocketWritableByteChannel(); byteChannel = new KQueueSocketWritableByteChannel();
} }
for (int i = writeSpinCount; i > 0; --i) { final long flushedAmount = region.transferTo(byteChannel, region.transferred());
final long localFlushedAmount = region.transferTo(byteChannel, region.transferred());
if (localFlushedAmount == 0) {
break;
}
flushedAmount += localFlushedAmount;
if (region.transferred() >= region.count()) {
done = true;
break;
}
}
if (flushedAmount > 0) { if (flushedAmount > 0) {
in.progress(flushedAmount); in.progress(flushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
} }
return WRITE_STATUS_SNDBUF_FULL;
if (done) {
in.remove();
}
return done;
} }
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount(); int writeSpinCount = config().getWriteSpinCount();
for (;;) { do {
final int msgCount = in.size(); final int msgCount = in.size();
// Do gathering write if the outbound buffer entries start with more than one ByteBuf.
if (msgCount == 0) { if (msgCount > 1 && in.current() instanceof ByteBuf) {
writeSpinCount -= doWriteMultiple(in);
} else if (msgCount == 0) {
// Wrote all messages. // Wrote all messages.
writeFilter(false); writeFilter(false);
// Return here so we not set the EPOLLOUT flag. // Return here so we don't set the WRITE flag.
return; return;
}
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
if (!doWriteMultiple(in, writeSpinCount)) {
// Break the loop and so set EPOLLOUT flag.
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
} else { // msgCount == 1 } else { // msgCount == 1
if (!doWriteSingle(in, writeSpinCount)) { writeSpinCount -= doWriteSingle(in);
// Break the loop and so set EPOLLOUT flag.
break;
}
} }
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
} while (writeSpinCount > 0);
if (writeSpinCount == 0) {
// We used our writeSpin quantum, and should try to write again later.
eventLoop().execute(flushTask);
} else {
// Underlying descriptor can not accept all data currently, so set the WRITE flag to be woken up
// when it can accept more data.
writeFilter(true);
} }
// Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
// when it can accept more data.
writeFilter(true);
} }
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception { /**
* Attempt to write a single object.
* @param in the collection which contains objects to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
* data was accepted</li>
* </ul>
* @throws Exception If an I/O error occurs.
*/
protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region. // The outbound buffer contains only one message or it contains a file region.
Object msg = in.current(); Object msg = in.current();
if (msg instanceof ByteBuf) { if (msg instanceof ByteBuf) {
if (!writeBytes(in, (ByteBuf) msg, writeSpinCount)) { return writeBytes(in, (ByteBuf) msg);
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else if (msg instanceof DefaultFileRegion) { } else if (msg instanceof DefaultFileRegion) {
if (!writeDefaultFileRegion(in, (DefaultFileRegion) msg, writeSpinCount)) { return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else if (msg instanceof FileRegion) { } else if (msg instanceof FileRegion) {
if (!writeFileRegion(in, (FileRegion) msg, writeSpinCount)) { return writeFileRegion(in, (FileRegion) msg);
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { } else {
// Should never reach here. // Should never reach here.
throw new Error(); throw new Error();
} }
return true;
} }
private boolean doWriteMultiple(ChannelOutboundBuffer in, int writeSpinCount) throws Exception { /**
* Attempt to write multiple {@link ByteBuf} objects.
* @param in the collection which contains objects to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
* data was accepted</li>
* </ul>
* @throws Exception If an I/O error occurs.
*/
private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
if (PlatformDependent.hasUnsafe()) { if (PlatformDependent.hasUnsafe()) {
// this means we can cast to IovArray and write the IovArray directly.
IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray(); IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
array.maxBytes(maxBytesPerGatheringWrite);
in.forEachFlushedMessage(array); in.forEachFlushedMessage(array);
int cnt = array.count(); if (array.count() >= 1) {
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially. // TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, array, writeSpinCount)) { return writeBytesMultiple(in, array);
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
} }
} else { } else {
ByteBuffer[] buffers = in.nioBuffers(); ByteBuffer[] buffers = in.nioBuffers();
int cnt = in.nioBufferCount(); int cnt = in.nioBufferCount();
if (cnt >= 1) { if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially. // TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), writeSpinCount)) { return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite);
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
} }
} }
// cnt == 0, which means the outbound buffer contained empty buffers only.
return true; in.removeBytes(0);
return 0;
} }
@Override @Override

View File

@ -83,7 +83,7 @@ final class BsdSocket extends Socket {
long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException { long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException {
// Open the file-region as it may be created via the lazy constructor. This is needed as we directly access // Open the file-region as it may be created via the lazy constructor. This is needed as we directly access
// the FileChannel field directly via JNI // the FileChannel field via JNI.
src.open(); src.open();
long res = sendFile(intValue(), src, baseOffset, offset, length); long res = sendFile(intValue(), src, baseOffset, offset, length);

View File

@ -26,11 +26,14 @@ import io.netty.util.internal.UnstableApi;
import java.util.Map; import java.util.Map;
import static io.netty.channel.kqueue.KQueueChannelOption.RCV_ALLOC_TRANSPORT_PROVIDES_GUESS; import static io.netty.channel.kqueue.KQueueChannelOption.RCV_ALLOC_TRANSPORT_PROVIDES_GUESS;
import static io.netty.channel.unix.Limits.SSIZE_MAX;
import static java.lang.Math.min;
@UnstableApi @UnstableApi
public class KQueueChannelConfig extends DefaultChannelConfig { public class KQueueChannelConfig extends DefaultChannelConfig {
final AbstractKQueueChannel channel; final AbstractKQueueChannel channel;
private volatile boolean transportProvidesGuess; private volatile boolean transportProvidesGuess;
private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;
KQueueChannelConfig(AbstractKQueueChannel channel) { KQueueChannelConfig(AbstractKQueueChannel channel) {
super(channel); super(channel);
@ -153,4 +156,12 @@ public class KQueueChannelConfig extends DefaultChannelConfig {
protected final void autoReadCleared() { protected final void autoReadCleared() {
channel.clearReadFilter(); channel.clearReadFilter();
} }
final void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) {
this.maxBytesPerGatheringWrite = min(SSIZE_MAX, maxBytesPerGatheringWrite);
}
final long getMaxBytesPerGatheringWrite() {
return maxBytesPerGatheringWrite;
}
} }

View File

@ -96,14 +96,14 @@ public final class KQueueDomainSocketChannel extends AbstractKQueueStreamChannel
} }
@Override @Override
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception { protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
Object msg = in.current(); Object msg = in.current();
if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) { if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
// File descriptor was written, so remove it. // File descriptor was written, so remove it.
in.remove(); in.remove();
return true; return 1;
} }
return super.doWriteSingle(in, writeSpinCount); return super.doWriteSingle(in);
} }
@Override @Override

View File

@ -50,6 +50,7 @@ public final class KQueueSocketChannelConfig extends KQueueChannelConfig impleme
if (PlatformDependent.canEnableTcpNoDelayByDefault()) { if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
setTcpNoDelay(true); setTcpNoDelay(true);
} }
calculateMaxBytesPerGatheringWrite();
} }
@Override @Override
@ -256,6 +257,7 @@ public final class KQueueSocketChannelConfig extends KQueueChannelConfig impleme
public KQueueSocketChannelConfig setSendBufferSize(int sendBufferSize) { public KQueueSocketChannelConfig setSendBufferSize(int sendBufferSize) {
try { try {
channel.socket.setSendBufferSize(sendBufferSize); channel.socket.setSendBufferSize(sendBufferSize);
calculateMaxBytesPerGatheringWrite();
return this; return this;
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
@ -383,4 +385,12 @@ public final class KQueueSocketChannelConfig extends KQueueChannelConfig impleme
super.setMessageSizeEstimator(estimator); super.setMessageSizeEstimator(estimator);
return this; return this;
} }
private void calculateMaxBytesPerGatheringWrite() {
// Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
int newSendBufferSize = getSendBufferSize() << 1;
if (newSendBufferSize > 0) {
setMaxBytesPerGatheringWrite(getSendBufferSize() << 1);
}
}
} }

View File

@ -19,6 +19,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
#include <sys/uio.h> #include <sys/uio.h>
#include <limits.h>
#include "netty_unix_errors.h" #include "netty_unix_errors.h"
#include "netty_unix_filedescriptor.h" #include "netty_unix_filedescriptor.h"
@ -110,40 +111,103 @@ static jlong netty_unix_filedescriptor_writevAddresses(JNIEnv* env, jclass clazz
return _writev(env, clazz, fd, iov, length); return _writev(env, clazz, fd, iov, length);
} }
static jlong netty_unix_filedescriptor_writev(JNIEnv* env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) { static jlong netty_unix_filedescriptor_writev(JNIEnv* env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length, jlong maxBytesToWrite) {
struct iovec iov[length]; struct iovec iov[length];
int iovidx = 0; struct iovec* iovptr = NULL;
int i; int i;
int num = offset + length; int num = offset + length;
for (i = offset; i < num; i++) { if (posFieldId != NULL && limitFieldId != NULL) {
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); for (i = offset; i < num; i++) {
jint pos; jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i);
// Get the current position using the (*env)->GetIntField if possible and fallback jint pos = (*env)->GetIntField(env, bufObj, posFieldId);
// to slower (*env)->CallIntMethod(...) if needed jint limit = (*env)->GetIntField(env, bufObj, limitFieldId);
if (posFieldId == NULL) { size_t bytesLength = (size_t) (limit - pos);
pos = (*env)->CallIntMethod(env, bufObj, posId, NULL); if (bytesLength > maxBytesToWrite && i != offset) {
} else { bytesLength = num - i;
pos = (*env)->GetIntField(env, bufObj, posFieldId); break;
} }
jint limit; maxBytesToWrite -= bytesLength;
// Get the current limit using the (*env)->GetIntField if possible and fallback void* buffer = (*env)->GetDirectBufferAddress(env, bufObj);
// to slower (*env)->CallIntMethod(...) if needed // We check that GetDirectBufferAddress will not return NULL in OnLoad
if (limitFieldId == NULL) { iovptr->iov_base = buffer + pos;
limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL); iovptr->iov_len = bytesLength;
} else { ++iovptr;
limit = (*env)->GetIntField(env, bufObj, limitFieldId);
}
void* buffer = (*env)->GetDirectBufferAddress(env, bufObj);
// We check that GetDirectBufferAddress will not return NULL in OnLoad
iov[iovidx].iov_base = buffer + pos;
iov[iovidx].iov_len = (size_t) (limit - pos);
iovidx++;
// Explicit delete local reference as otherwise the local references will only be released once the native method returns. // Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
// //
// See https://github.com/netty/netty/issues/2623 // See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj); (*env)->DeleteLocalRef(env, bufObj);
}
} else if (posFieldId != NULL && limitFieldId == NULL) {
for (i = offset; i < num; i++) {
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i);
jint pos = (*env)->GetIntField(env, bufObj, posFieldId);
jint limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL);
size_t bytesLength = (size_t) (limit - pos);
if (bytesLength > maxBytesToWrite && i != offset) {
bytesLength = num - i;
break;
}
maxBytesToWrite -= bytesLength;
void* buffer = (*env)->GetDirectBufferAddress(env, bufObj);
// We check that GetDirectBufferAddress will not return NULL in OnLoad
iovptr->iov_base = buffer + pos;
iovptr->iov_len = bytesLength;
++iovptr;
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj);
}
} else if (limitFieldId != NULL) {
for (i = offset; i < num; i++) {
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i);
jint pos = (*env)->CallIntMethod(env, bufObj, posId, NULL);
jint limit = (*env)->GetIntField(env, bufObj, limitFieldId);
size_t bytesLength = (size_t) (limit - pos);
if (bytesLength > maxBytesToWrite && i != offset) {
bytesLength = num - i;
break;
}
maxBytesToWrite -= bytesLength;
void* buffer = (*env)->GetDirectBufferAddress(env, bufObj);
// We check that GetDirectBufferAddress will not return NULL in OnLoad
iovptr->iov_base = buffer + pos;
iovptr->iov_len = bytesLength;
++iovptr;
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj);
}
} else {
for (i = offset; i < num; i++) {
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i);
jint pos = (*env)->CallIntMethod(env, bufObj, posId, NULL);
jint limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL);
size_t bytesLength = (size_t) (limit - pos);
if (bytesLength > maxBytesToWrite && i != offset) {
bytesLength = num - i;
break;
}
maxBytesToWrite -= bytesLength;
void* buffer = (*env)->GetDirectBufferAddress(env, bufObj);
// We check that GetDirectBufferAddress will not return NULL in OnLoad
iovptr->iov_base = buffer + pos;
iovptr->iov_len = bytesLength;
++iovptr;
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj);
}
} }
return _writev(env, clazz, fd, iov, length); return _writev(env, clazz, fd, iov, length);
} }
@ -195,7 +259,7 @@ static const JNINativeMethod method_table[] = {
{ "write", "(ILjava/nio/ByteBuffer;II)I", (void *) netty_unix_filedescriptor_write }, { "write", "(ILjava/nio/ByteBuffer;II)I", (void *) netty_unix_filedescriptor_write },
{ "writeAddress", "(IJII)I", (void *) netty_unix_filedescriptor_writeAddress }, { "writeAddress", "(IJII)I", (void *) netty_unix_filedescriptor_writeAddress },
{ "writevAddresses", "(IJI)J", (void *) netty_unix_filedescriptor_writevAddresses }, { "writevAddresses", "(IJI)J", (void *) netty_unix_filedescriptor_writevAddresses },
{ "writev", "(I[Ljava/nio/ByteBuffer;II)J", (void *) netty_unix_filedescriptor_writev }, { "writev", "(I[Ljava/nio/ByteBuffer;IIJ)J", (void *) netty_unix_filedescriptor_writev },
{ "read", "(ILjava/nio/ByteBuffer;II)I", (void *) netty_unix_filedescriptor_read }, { "read", "(ILjava/nio/ByteBuffer;II)I", (void *) netty_unix_filedescriptor_read },
{ "readAddress", "(IJII)I", (void *) netty_unix_filedescriptor_readAddress }, { "readAddress", "(IJII)I", (void *) netty_unix_filedescriptor_readAddress },
{ "newPipe", "()J", (void *) netty_unix_filedescriptor_newPipe } { "newPipe", "()J", (void *) netty_unix_filedescriptor_newPipe }

View File

@ -25,10 +25,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import static io.netty.channel.unix.Errors.ioResult; import static io.netty.channel.unix.Errors.ioResult;
import static io.netty.channel.unix.Errors.newIOException; import static io.netty.channel.unix.Errors.newIOException;
import static io.netty.channel.unix.LimitsStaticallyReferencedJniMethods.iovMax; import static io.netty.channel.unix.Limits.IOV_MAX;
import static io.netty.channel.unix.LimitsStaticallyReferencedJniMethods.ssizeMax;
import static io.netty.channel.unix.LimitsStaticallyReferencedJniMethods.uioMaxIov;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.min;
/** /**
* Native {@link FileDescriptor} implementation which allows to wrap an {@code int} and provide a * Native {@link FileDescriptor} implementation which allows to wrap an {@code int} and provide a
@ -140,8 +139,8 @@ public class FileDescriptor {
WRITE_ADDRESS_CONNECTION_RESET_EXCEPTION, WRITE_ADDRESS_CLOSED_CHANNEL_EXCEPTION); WRITE_ADDRESS_CONNECTION_RESET_EXCEPTION, WRITE_ADDRESS_CLOSED_CHANNEL_EXCEPTION);
} }
public final long writev(ByteBuffer[] buffers, int offset, int length) throws IOException { public final long writev(ByteBuffer[] buffers, int offset, int length, long maxBytesToWrite) throws IOException {
long res = writev(fd, buffers, offset, length); long res = writev(fd, buffers, offset, min(IOV_MAX, length), maxBytesToWrite);
if (res >= 0) { if (res >= 0) {
return res; return res;
} }
@ -263,7 +262,7 @@ public class FileDescriptor {
private static native int write(int fd, ByteBuffer buf, int pos, int limit); private static native int write(int fd, ByteBuffer buf, int pos, int limit);
private static native int writeAddress(int fd, long address, int pos, int limit); private static native int writeAddress(int fd, long address, int pos, int limit);
private static native long writev(int fd, ByteBuffer[] buffers, int offset, int length); private static native long writev(int fd, ByteBuffer[] buffers, int offset, int length, long maxBytesToWrite);
private static native long writevAddresses(int fd, long memoryAddress, int length); private static native long writevAddresses(int fd, long memoryAddress, int length);
private static native int read(int fd, ByteBuffer buf, int pos, int limit); private static native int read(int fd, ByteBuffer buf, int pos, int limit);

View File

@ -23,6 +23,13 @@ import java.nio.ByteBuffer;
import static io.netty.channel.unix.Limits.IOV_MAX; import static io.netty.channel.unix.Limits.IOV_MAX;
import static io.netty.channel.unix.Limits.SSIZE_MAX; import static io.netty.channel.unix.Limits.SSIZE_MAX;
import static io.netty.util.internal.ObjectUtil.checkPositive;
import static io.netty.util.internal.PlatformDependent.allocateMemory;
import static io.netty.util.internal.PlatformDependent.directBufferAddress;
import static io.netty.util.internal.PlatformDependent.freeMemory;
import static io.netty.util.internal.PlatformDependent.putInt;
import static io.netty.util.internal.PlatformDependent.putLong;
import static java.lang.Math.min;
/** /**
* Represent an array of struct array and so can be passed directly over via JNI without the need to do any more * Represent an array of struct array and so can be passed directly over via JNI without the need to do any more
@ -61,9 +68,10 @@ public final class IovArray implements MessageProcessor {
private final long memoryAddress; private final long memoryAddress;
private int count; private int count;
private long size; private long size;
private long maxBytes = SSIZE_MAX;
public IovArray() { public IovArray() {
memoryAddress = PlatformDependent.allocateMemory(CAPACITY); memoryAddress = allocateMemory(CAPACITY);
} }
public void clear() { public void clear() {
@ -76,37 +84,18 @@ public final class IovArray implements MessageProcessor {
* {@code false} otherwise. * {@code false} otherwise.
*/ */
public boolean add(ByteBuf buf) { public boolean add(ByteBuf buf) {
int nioBufferCount = buf.nioBufferCount(); if (count == IOV_MAX) {
if (count + nioBufferCount > IOV_MAX) {
// No more room! // No more room!
return false; return false;
} } else if (buf.nioBufferCount() == 1) {
if (nioBufferCount == 1) {
final int len = buf.readableBytes(); final int len = buf.readableBytes();
if (len == 0) { return len == 0 || add(buf.memoryAddress(), buf.readerIndex(), len);
// No need to add an empty buffer.
// We return true here because we want ChannelOutboundBuffer.forEachFlushedMessage() to continue
// fetching the next buffers.
return true;
}
final long addr = buf.memoryAddress();
final int offset = buf.readerIndex();
return add(addr, offset, len);
} else { } else {
ByteBuffer[] buffers = buf.nioBuffers(); ByteBuffer[] buffers = buf.nioBuffers();
for (ByteBuffer nioBuffer : buffers) { for (ByteBuffer nioBuffer : buffers) {
int len = nioBuffer.remaining(); final int len = nioBuffer.remaining();
if (len == 0) { if (len != 0 && (!add(directBufferAddress(nioBuffer), nioBuffer.position(), len) || count == IOV_MAX)) {
// No need to add an empty buffer so just continue break;
continue;
}
int offset = nioBuffer.position();
long addr = PlatformDependent.directBufferAddress(nioBuffer);
if (!add(addr, offset, len)) {
return false;
} }
} }
return true; return true;
@ -119,11 +108,13 @@ public final class IovArray implements MessageProcessor {
return true; return true;
} }
final long baseOffset = memoryAddress(count++); final long baseOffset = memoryAddress(count);
final long lengthOffset = baseOffset + ADDRESS_SIZE; final long lengthOffset = baseOffset + ADDRESS_SIZE;
if (SSIZE_MAX - len < size) { // If there is at least 1 entry then we enforce the maximum bytes. We want to accept at least one entry so we
// If the size + len will overflow an SSIZE_MAX we stop populate the IovArray. This is done as linux // will attempt to write some data and make progress.
if (maxBytes - len < size && count > 0) {
// If the size + len will overflow SSIZE_MAX we stop populate the IovArray. This is done as linux
// not allow to write more bytes then SSIZE_MAX with one writev(...) call and so will // not allow to write more bytes then SSIZE_MAX with one writev(...) call and so will
// return 'EINVAL', which will raise an IOException. // return 'EINVAL', which will raise an IOException.
// //
@ -132,49 +123,20 @@ public final class IovArray implements MessageProcessor {
return false; return false;
} }
size += len; size += len;
++count;
if (ADDRESS_SIZE == 8) { if (ADDRESS_SIZE == 8) {
// 64bit // 64bit
PlatformDependent.putLong(baseOffset, addr + offset); putLong(baseOffset, addr + offset);
PlatformDependent.putLong(lengthOffset, len); putLong(lengthOffset, len);
} else { } else {
assert ADDRESS_SIZE == 4; assert ADDRESS_SIZE == 4;
PlatformDependent.putInt(baseOffset, (int) addr + offset); putInt(baseOffset, (int) addr + offset);
PlatformDependent.putInt(lengthOffset, len); putInt(lengthOffset, len);
} }
return true; return true;
} }
/**
* Process the written iov entries. This will return the length of the iov entry on the given index if it is
* smaller then the given {@code written} value. Otherwise it returns {@code -1}.
*/
public long processWritten(int index, long written) {
long baseOffset = memoryAddress(index);
long lengthOffset = baseOffset + ADDRESS_SIZE;
if (ADDRESS_SIZE == 8) {
// 64bit
long len = PlatformDependent.getLong(lengthOffset);
if (len > written) {
long offset = PlatformDependent.getLong(baseOffset);
PlatformDependent.putLong(baseOffset, offset + written);
PlatformDependent.putLong(lengthOffset, len - written);
return -1;
}
return len;
} else {
assert ADDRESS_SIZE == 4;
long len = PlatformDependent.getInt(lengthOffset);
if (len > written) {
int offset = PlatformDependent.getInt(baseOffset);
PlatformDependent.putInt(baseOffset, (int) (offset + written));
PlatformDependent.putInt(lengthOffset, (int) (len - written));
return -1;
}
return len;
}
}
/** /**
* Returns the number if iov entries. * Returns the number if iov entries.
*/ */
@ -189,6 +151,28 @@ public final class IovArray implements MessageProcessor {
return size; return size;
} }
/**
* Set the maximum amount of bytes that can be added to this {@link IovArray} via {@link #add(ByteBuf)}.
* <p>
* This will not impact the existing state of the {@link IovArray}, and only applies to subsequent calls to
* {@link #add(ByteBuf)}.
* <p>
* In order to ensure some progress is made at least one {@link ByteBuf} will be accepted even if it's size exceeds
* this value.
* @param maxBytes the maximum amount of bytes that can be added to this {@link IovArray} via {@link #add(ByteBuf)}.
*/
public void maxBytes(long maxBytes) {
this.maxBytes = min(SSIZE_MAX, checkPositive(maxBytes, "maxBytes"));
}
/**
* Get the maximum amount of bytes that can be added to this {@link IovArray} via {@link #add(ByteBuf)}.
* @return the maximum amount of bytes that can be added to this {@link IovArray} via {@link #add(ByteBuf)}.
*/
public long maxBytes() {
return maxBytes;
}
/** /**
* Returns the {@code memoryAddress} for the given {@code offset}. * Returns the {@code memoryAddress} for the given {@code offset}.
*/ */
@ -200,14 +184,11 @@ public final class IovArray implements MessageProcessor {
* Release the {@link IovArray}. Once release further using of it may crash the JVM! * Release the {@link IovArray}. Once release further using of it may crash the JVM!
*/ */
public void release() { public void release() {
PlatformDependent.freeMemory(memoryAddress); freeMemory(memoryAddress);
} }
@Override @Override
public boolean processMessage(Object msg) throws Exception { public boolean processMessage(Object msg) throws Exception {
if (msg instanceof ByteBuf) { return (msg instanceof ByteBuf) && add((ByteBuf) msg);
return add((ByteBuf) msg);
}
return false;
} }
} }

View File

@ -35,6 +35,8 @@ import java.util.Arrays;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import static java.lang.Math.min;
/** /**
* (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
* outbound write requests. * outbound write requests.
@ -367,6 +369,26 @@ public final class ChannelOutboundBuffer {
* </p> * </p>
*/ */
public ByteBuffer[] nioBuffers() { public ByteBuffer[] nioBuffers() {
return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
}
/**
* Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
* {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
* array and the total number of readable bytes of the NIO buffers respectively.
* <p>
* Note that the returned array is reused and thus should not escape
* {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
* Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
* </p>
* @param maxCount The maximum amount of buffers that will be added to the return value.
* @param maxBytes A hint toward the maximum number of bytes to include as part of the return value. Note that this
* value maybe exceeded because we make a best effort to include at least 1 {@link ByteBuffer}
* in the return value to ensure write progress is made.
*/
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
assert maxCount > 0;
assert maxBytes > 0;
long nioBufferSize = 0; long nioBufferSize = 0;
int nioBufferCount = 0; int nioBufferCount = 0;
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
@ -379,13 +401,14 @@ public final class ChannelOutboundBuffer {
final int readableBytes = buf.writerIndex() - readerIndex; final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) { if (readableBytes > 0) {
if (Integer.MAX_VALUE - readableBytes < nioBufferSize) { if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
// If the nioBufferSize + readableBytes will overflow an Integer we stop populate the // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
// ByteBuffer array. This is done as bsd/osx don't allow to write more bytes then // we stop populate the ByteBuffer array. This is done for 2 reasons:
// Integer.MAX_VALUE with one writev(...) call and so will return 'EINVAL', which will // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
// raise an IOException. On Linux it may work depending on the // and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
// architecture and kernel but to be safe we also enforce the limit here. // on the architecture and kernel but to be safe we also enforce the limit here.
// This said writing more the Integer.MAX_VALUE is not a good idea anyway. // 2. There is no sense in putting more data in the array than is likely to be accepted by the
// OS.
// //
// See also: // See also:
// - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2 // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
@ -396,9 +419,9 @@ public final class ChannelOutboundBuffer {
int count = entry.count; int count = entry.count;
if (count == -1) { if (count == -1) {
//noinspection ConstantValueVariableUse //noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount(); entry.count = count = buf.nioBufferCount();
} }
int neededSpace = nioBufferCount + count; int neededSpace = min(maxCount, nioBufferCount + count);
if (neededSpace > nioBuffers.length) { if (neededSpace > nioBuffers.length) {
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers); NIO_BUFFERS.set(threadLocalMap, nioBuffers);
@ -410,7 +433,7 @@ public final class ChannelOutboundBuffer {
// derived buffer // derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
} }
nioBuffers[nioBufferCount ++] = nioBuf; nioBuffers[nioBufferCount++] = nioBuf;
} else { } else {
ByteBuffer[] nioBufs = entry.bufs; ByteBuffer[] nioBufs = entry.bufs;
if (nioBufs == null) { if (nioBufs == null) {
@ -418,7 +441,18 @@ public final class ChannelOutboundBuffer {
// of Object allocation // of Object allocation
entry.bufs = nioBufs = buf.nioBuffers(); entry.bufs = nioBufs = buf.nioBuffers();
} }
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
ByteBuffer nioBuf = nioBufs[i];
if (nioBuf == null) {
break;
} else if (!nioBuf.hasRemaining()) {
continue;
}
nioBuffers[nioBufferCount++] = nioBuf;
}
}
if (nioBufferCount == maxCount) {
break;
} }
} }
} }
@ -430,16 +464,6 @@ public final class ChannelOutboundBuffer {
return nioBuffers; return nioBuffers;
} }
private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) {
for (ByteBuffer nioBuf: nioBufs) {
if (nioBuf == null) {
break;
}
nioBuffers[nioBufferCount ++] = nioBuf;
}
return nioBufferCount;
}
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
int newCapacity = array.length; int newCapacity = array.length;
do { do {

View File

@ -265,6 +265,13 @@ public class DefaultChannelConfig implements ChannelConfig {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"writeSpinCount must be a positive integer."); "writeSpinCount must be a positive integer.");
} }
// Integer.MAX_VALUE is used as a special value in the channel implementations to indicate the channel cannot
// accept any more data, and results in the writeOp being set on the selector (or execute a runnable which tries
// to flush later because the writeSpinCount quantum has been exhausted). This strategy prevents additional
// conditional logic in the channel implementations, and shouldn't be noticeable in practice.
if (writeSpinCount == Integer.MAX_VALUE) {
--writeSpinCount;
}
this.writeSpinCount = writeSpinCount; this.writeSpinCount = writeSpinCount;
return this; return this;
} }

View File

@ -0,0 +1,24 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.internal;
public final class ChannelUtils {
public static final int MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD = 4096;
public static final int WRITE_STATUS_SNDBUF_FULL = Integer.MAX_VALUE;
private ChannelUtils() {
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Internal utilities for channel implementations.
*/
package io.netty.channel.internal;

View File

@ -26,6 +26,7 @@ import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.internal.ChannelUtils;
import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete; import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
@ -34,6 +35,8 @@ import java.io.IOException;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
/** /**
* {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes. * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
*/ */
@ -161,12 +164,71 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} }
} }
/**
* Write objects to the OS.
* @param in the collection which contains objects to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
* data was accepted</li>
* </ul>
* @throws Exception if an I/O exception occurs during write.
*/
protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
Object msg = in.current();
if (msg == null) {
// Directly return here so incompleteWrite(...) is not called.
return 0;
}
return doWriteInternal(in, in.current());
}
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
in.remove();
return 0;
}
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
return WRITE_STATUS_SNDBUF_FULL;
}
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1; int writeSpinCount = config().getWriteSpinCount();
do {
boolean setOpWrite = false;
for (;;) {
Object msg = in.current(); Object msg = in.current();
if (msg == null) { if (msg == null) {
// Wrote all messages. // Wrote all messages.
@ -174,81 +236,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
// Directly return here so incompleteWrite(...) is not called. // Directly return here so incompleteWrite(...) is not called.
return; return;
} }
writeSpinCount -= doWriteInternal(in, msg);
} while (writeSpinCount > 0);
if (msg instanceof ByteBuf) { incompleteWrite(writeSpinCount < 0);
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean done = region.transferred() >= region.count();
if (!done) {
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i--) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (region.transferred() >= region.count()) {
done = true;
break;
}
}
in.progress(flushedAmount);
}
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else {
// Should not reach here.
throw new Error();
}
}
incompleteWrite(setOpWrite);
} }
@Override @Override

View File

@ -25,13 +25,13 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.internal.SocketUtils;
import io.netty.channel.nio.AbstractNioByteChannel; import io.netty.channel.nio.AbstractNioByteChannel;
import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SocketUtils;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -46,6 +46,8 @@ import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider; import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
/** /**
* {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation. * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation.
*/ */
@ -355,75 +357,80 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
return region.transferTo(javaChannel(), position); return region.transferTo(javaChannel(), position);
} }
private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
// By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
// SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
// make a best effort to adjust as OS behavior changes.
if (attempted == written) {
if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
}
} else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
}
}
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) { SocketChannel ch = javaChannel();
int size = in.size(); int writeSpinCount = config().getWriteSpinCount();
if (size == 0) { do {
if (in.isEmpty()) {
// All written so clear OP_WRITE // All written so clear OP_WRITE
clearOpWrite(); clearOpWrite();
break; // Directly return here so incompleteWrite(...) is not called.
return;
} }
long writtenBytes = 0;
boolean done = false;
boolean setOpWrite = false;
// Ensure the pending writes are made of ByteBufs only. // Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers(); int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount(); int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
SocketChannel ch = javaChannel();
// Always us nioBuffers() to workaround data-corruption. // Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761 // See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) { switch (nioBufferCnt) {
case 0: case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes. // We have something else beside ByteBuffers to write so fallback to normal writes.
super.doWrite(in); writeSpinCount -= doWrite0(in);
return; break;
case 1: case 1: {
// Only one ByteBuf so use non-gathering write // Only one ByteBuf so use non-gathering write
ByteBuffer nioBuffer = nioBuffers[0]; // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { // to check if the total size of all the buffers is non-zero.
final int localWrittenBytes = ch.write(nioBuffer); ByteBuffer buffer = nioBuffers[0];
if (localWrittenBytes == 0) { int attemptedBytes = buffer.remaining();
setOpWrite = true; final int localWrittenBytes = ch.write(buffer);
break; if (localWrittenBytes <= 0) {
} incompleteWrite(true);
expectedWrittenBytes -= localWrittenBytes; return;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
} }
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break; break;
default: }
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { default: {
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
if (localWrittenBytes == 0) { // to check if the total size of all the buffers is non-zero.
setOpWrite = true; // We limit the max amount to int above so cast is safe
break; long attemptedBytes = in.nioBufferSize();
} final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
expectedWrittenBytes -= localWrittenBytes; if (localWrittenBytes <= 0) {
writtenBytes += localWrittenBytes; incompleteWrite(true);
if (expectedWrittenBytes == 0) { return;
done = true;
break;
}
} }
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break; break;
}
} }
} while (writeSpinCount > 0);
// Release the fully written buffers, and update the indexes of the partially written buffer. incompleteWrite(writeSpinCount < 0);
in.removeBytes(writtenBytes);
if (!done) {
// Did not write all buffers completely.
incompleteWrite(setOpWrite);
break;
}
}
} }
@Override @Override
@ -452,14 +459,40 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
} }
} }
private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
private volatile int maxBytesPerGatheringWrite = Integer.MAX_VALUE;
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
super(channel, javaSocket); super(channel, javaSocket);
calculateMaxBytesPerGatheringWrite();
} }
@Override @Override
protected void autoReadCleared() { protected void autoReadCleared() {
clearReadPending(); clearReadPending();
} }
@Override
public NioSocketChannelConfig setSendBufferSize(int sendBufferSize) {
super.setSendBufferSize(sendBufferSize);
calculateMaxBytesPerGatheringWrite();
return this;
}
void setMaxBytesPerGatheringWrite(int maxBytesPerGatheringWrite) {
this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite;
}
int getMaxBytesPerGatheringWrite() {
return maxBytesPerGatheringWrite;
}
private void calculateMaxBytesPerGatheringWrite() {
// Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
int newSendBufferSize = getSendBufferSize() << 1;
if (newSendBufferSize > 0) {
setMaxBytesPerGatheringWrite(getSendBufferSize() << 1);
}
}
} }
} }