Optimize native transport for gathering writes

Motivation:

While benchmarking the native transport with gathering writes I noticed that it is quite slow. This is due the fact that we need to do a lot of array copies to get the buffers into the iov array.

Modification:

Introduce a new class calles IovArray which allows to fill buffers directly in a iov array that can be passed over to JNI without any array copies. This gives a nice optimization in terms of speed when doing gathering writes.

Result:

Big performance improvement when doing gathering writes. See the included benchmark...

Before:
[nmaurer@xxx]~% wrk/wrk -H 'Host: localhost' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' -H 'Connection: keep-alive' -d 120 -c 256 -t 16 --pipeline 256  http://xxx:8080/plaintext
Running 2m test @ http://xxx:8080/plaintext
  16 threads and 256 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    23.44ms   16.37ms 259.57ms   91.77%
    Req/Sec   181.99k    31.69k  304.60k    78.12%
  346544071 requests in 2.00m, 46.48GB read
Requests/sec: 2887885.09
Transfer/sec:    396.59MB

With this change:
[nmaurer@xxx]~% wrk/wrk -H 'Host: localhost' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' -H 'Connection: keep-alive' -d 120 -c 256 -t 16 --pipeline 256  http://xxx:8080/plaintext
Running 2m test @ http://xxx:8080/plaintext
  16 threads and 256 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    21.93ms   16.33ms 305.73ms   92.34%
    Req/Sec   194.56k    33.75k  309.33k    77.04%
  369617503 requests in 2.00m, 49.57GB read
Requests/sec: 3080169.65
Transfer/sec:    423.00MB
This commit is contained in:
Norman Maurer 2014-07-22 22:27:50 +02:00
parent 44ea769f53
commit ca0571cc9d
10 changed files with 281 additions and 211 deletions

View File

@ -80,6 +80,8 @@ public final class PlatformDependent {
private static final int BIT_MODE = bitMode0();
private static final int ADDRESS_SIZE = addressSize0();
static {
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noPreferDirect: {}", !DIRECT_BUFFER_PREFERRED);
@ -173,6 +175,22 @@ public final class PlatformDependent {
return BIT_MODE;
}
/**
* Return the address size of the OS.
* 4 (for 32 bits systems ) and 8 (for 64 bits systems).
*/
public static int addressSize() {
return ADDRESS_SIZE;
}
public static long allocateMemory(long size) {
return PlatformDependent0.allocateMemory(size);
}
public static void freeMemory(long address) {
PlatformDependent0.freeMemory(address);
}
/**
* Raises an exception bypassing compiler checks for checked exceptions.
*/
@ -815,6 +833,13 @@ public final class PlatformDependent {
}
}
private static int addressSize0() {
if (!hasUnsafe()) {
return -1;
}
return PlatformDependent0.addressSize();
}
private PlatformDependent() {
// only static method supported
}

View File

@ -365,6 +365,18 @@ final class PlatformDependent0 {
}
}
static int addressSize() {
return UNSAFE.addressSize();
}
static long allocateMemory(long size) {
return UNSAFE.allocateMemory(size);
}
static void freeMemory(long address) {
UNSAFE.freeMemory(address);
}
private PlatformDependent0() {
}

View File

@ -44,10 +44,7 @@ jfieldID limitFieldId = NULL;
jfieldID fileChannelFieldId = NULL;
jfieldID transferedFieldId = NULL;
jfieldID fdFieldId = NULL;
jfieldID fileDescriptorFieldId = NULL;
jfieldID readerIndexFieldId = NULL;
jfieldID writerIndexFieldId = NULL;
jfieldID memoryAddressFieldId = NULL;
jfieldID fileDescriptorFieldId = NULL;;
jmethodID inetSocketAddrMethodId = NULL;
jmethodID datagramSocketAddrMethodId = NULL;
jclass runtimeExceptionClass = NULL;
@ -393,27 +390,6 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
throwRuntimeException(env, "Unable to obtain constructor of DatagramSocketAddress");
return JNI_ERR;
}
jclass addressEntryClass = (*env)->FindClass(env, "io/netty/channel/epoll/EpollChannelOutboundBuffer$AddressEntry");
if (addressEntryClass == NULL) {
// pending exception...
return JNI_ERR;
}
readerIndexFieldId = (*env)->GetFieldID(env, addressEntryClass, "readerIndex", "I");
if (readerIndexFieldId == NULL) {
// pending exception...
return JNI_ERR;
}
writerIndexFieldId = (*env)->GetFieldID(env, addressEntryClass, "writerIndex", "I");
if (writerIndexFieldId == NULL) {
// pending exception...
return JNI_ERR;
}
memoryAddressFieldId = (*env)->GetFieldID(env, addressEntryClass, "memoryAddress", "J");
if (memoryAddressFieldId == NULL) {
// pending exception...
return JNI_ERR;
}
return JNI_VERSION_1_6;
}
}
@ -691,7 +667,7 @@ JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_recvFromAddress(JNI
return recvFrom0(env, fd, (void*) address, pos, limit);
}
jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec iov[], jint length) {
jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec * iov, jint length) {
ssize_t res;
int err;
do {
@ -755,28 +731,8 @@ JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env,
return writev0(env, clazz, fd, iov, length);
}
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length) {
struct iovec iov[length];
int iovidx = 0;
int i;
int num = offset + length;
for (i = offset; i < num; i++) {
jobject addressEntry = (*env)->GetObjectArrayElement(env, addresses, i);
jint readerIndex = (*env)->GetIntField(env, addressEntry, readerIndexFieldId);
jint writerIndex = (*env)->GetIntField(env, addressEntry, writerIndexFieldId);
void* memoryAddress = (void*) (*env)->GetLongField(env, addressEntry, memoryAddressFieldId);
iov[iovidx].iov_base = memoryAddress + readerIndex;
iov[iovidx].iov_len = (size_t) (writerIndex - readerIndex);
iovidx++;
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, addressEntry);
}
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length) {
struct iovec * iov = (struct iovec *) memoryAddress;
return writev0(env, clazz, fd, iov, length);
}

View File

@ -44,7 +44,7 @@ void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz,
jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
jint Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length);
jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length);
jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length);
jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);

View File

@ -20,17 +20,13 @@ import io.netty.channel.ChannelOutboundBuffer;
import io.netty.util.Recycler;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* Special {@link ChannelOutboundBuffer} implementation which allows to obtain an array of {@link AddressEntry}
* Special {@link ChannelOutboundBuffer} implementation which allows to obtain a {@link IovArray}
* and so doing gathering writes without the need to create a {@link ByteBuffer} internally. This reduce
* GC pressure a lot.
*/
final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer {
private AddressEntry[] addresses;
private int addressCount;
private long addressSize;
private static final Recycler<EpollChannelOutboundBuffer> RECYCLER = new Recycler<EpollChannelOutboundBuffer>() {
@Override
protected EpollChannelOutboundBuffer newObject(Handle<EpollChannelOutboundBuffer> handle) {
@ -49,7 +45,6 @@ final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer {
private EpollChannelOutboundBuffer(Recycler.Handle<? extends ChannelOutboundBuffer> handle) {
super(handle);
addresses = new AddressEntry[INITIAL_CAPACITY];
}
/**
@ -68,25 +63,19 @@ final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer {
}
/**
* Returns an array of {@link AddressEntry}'s if the currently pending messages are made of {@link ByteBuf} only.
* {@code null} is returned otherwise. If this method returns a non-null array, {@link #addressCount()} and
* {@link #addressSize()} ()} will return the number of {@link AddressEntry}'s in the returned array and the total
* number of readable bytes of the NIO buffers respectively.
* Returns a {@link IovArray} if the currently pending messages.
* <p>
* Note that the returned array is reused and thus should not escape
* Note that the returned {@link IovArray} is reused and thus should not escape
* {@link io.netty.channel.AbstractChannel#doWrite(ChannelOutboundBuffer)}.
* Refer to {@link EpollSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
* </p>
*/
AddressEntry[] memoryAddresses() {
long addressSize = 0;
int addressCount = 0;
IovArray iovArray() {
IovArray array = IovArray.get();
final Entry[] buffer = entries();
final int mask = buffer.length - 1;
AddressEntry[] addresses = this.addresses;
Object m;
final int mask = entryMask();
int unflushed = unflushed();
int flushed = flushed();
Object m;
while (flushed != unflushed && (m = buffer[flushed].msg()) != null) {
if (!(m instanceof ByteBuf)) {
// Just break out of the loop as we can still use gathering writes for the buffers that we
@ -94,108 +83,20 @@ final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer {
break;
}
AddressEntry entry = (AddressEntry) buffer[flushed];
Entry entry = buffer[flushed];
// Check if the entry was cancelled. if so we just skip it.
if (!entry.isCancelled()) {
ByteBuf buf = (ByteBuf) m;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) {
addressSize += readableBytes;
// See if there is enough space to at least store one more entry.
int neededSpace = addressCount + 1;
if (neededSpace > addresses.length) {
this.addresses = addresses =
expandAddressesArray(addresses, neededSpace, addressCount);
}
entry.memoryAddress = buf.memoryAddress();
entry.readerIndex = buf.readerIndex();
entry.writerIndex = buf.writerIndex();
addresses[addressCount ++] = entry;
if (!array.add(buf)) {
// Can not hold more data so break here.
// We will handle this on the next write loop.
break;
}
}
flushed = flushed + 1 & mask;
}
this.addressCount = addressCount;
this.addressSize = addressSize;
return addresses;
}
private static AddressEntry[] expandAddressesArray(AddressEntry[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// double capacity until it is big enough
// See https://github.com/netty/netty/issues/1890
newCapacity <<= 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
} while (neededSpace > newCapacity);
AddressEntry[] newArray = new AddressEntry[newCapacity];
System.arraycopy(array, 0, newArray, 0, size);
return newArray;
}
/**
* Return the number of {@link AddressEntry}'s which can be written.
*/
int addressCount() {
return addressCount;
}
/**
* Return the number of bytes that can be written via gathering writes.
*/
long addressSize() {
return addressSize;
}
@Override
public void recycle() {
if (addresses.length > INITIAL_CAPACITY) {
addresses = new AddressEntry[INITIAL_CAPACITY];
} else {
// null out the nio buffers array so the can be GC'ed
// https://github.com/netty/netty/issues/1763
Arrays.fill(addresses, null);
}
super.recycle();
}
@Override
protected AddressEntry newEntry() {
return new AddressEntry();
}
static final class AddressEntry extends Entry {
// These fields will be accessed via JNI directly so be carefully when touch them!
long memoryAddress;
int readerIndex;
int writerIndex;
@Override
public void clear() {
memoryAddress = -1;
readerIndex = 0;
writerIndex = 0;
super.clear();
}
@Override
public int cancel() {
memoryAddress = -1;
readerIndex = 0;
writerIndex = 0;
return super.cancel();
}
return array;
}
}

View File

@ -29,7 +29,6 @@ import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.epoll.EpollChannelOutboundBuffer.AddressEntry;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
@ -114,7 +113,28 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
boolean done = false;
long writtenBytes = 0;
if (buf.nioBufferCount() == 1) {
if (buf.hasMemoryAddress()) {
long memoryAddress = buf.memoryAddress();
int readerIndex = buf.readerIndex();
for (;;) {
int localFlushedAmount = Native.writeAddress(fd, memoryAddress, readerIndex, readableBytes);
if (localFlushedAmount > 0) {
writtenBytes += localFlushedAmount;
if (writtenBytes == readableBytes) {
done = true;
break;
}
readerIndex += localFlushedAmount;
readableBytes -= localFlushedAmount;
} else {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
}
updateOutboundBuffer(in, writtenBytes, 1, done);
return done;
} else if (buf.nioBufferCount() == 1) {
int readerIndex = buf.readerIndex();
ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes());
for (;;) {
@ -143,49 +163,44 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
private boolean writeBytesMultiple(
EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] addresses,
int addressCnt, long expectedWrittenBytes) throws IOException {
EpollChannelOutboundBuffer in, IovArray array) throws IOException {
boolean done = false;
long expectedWrittenBytes = array.size();
int cnt = array.count();
long writtenBytes = 0;
int offset = 0;
int end = offset + addressCnt;
loop: while (addressCnt > 0) {
for (;;) {
int cnt = addressCnt > Native.IOV_MAX? Native.IOV_MAX : addressCnt;
long localWrittenBytes = Native.writevAddresses(fd, addresses, offset, cnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break loop;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break loop;
}
do {
AddressEntry address = addresses[offset];
int readerIndex = address.readerIndex;
int bytes = address.writerIndex - readerIndex;
if (bytes > localWrittenBytes) {
address.readerIndex += (int) localWrittenBytes;
// incomplete write
break;
} else {
offset++;
addressCnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
int end = offset + cnt;
int messages = cnt;
for (;;) {
long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break;
}
do {
long bytes = array.processWritten(offset, localWrittenBytes);
if (bytes == -1) {
// incomplete write
break;
} else {
offset++;
cnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
}
updateOutboundBuffer(in, writtenBytes, msgCount, done);
updateOutboundBuffer(in, writtenBytes, messages, done);
return done;
}
@ -315,15 +330,14 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
// Do gathering write if:
// * the outbound buffer contains more than one messages and
// * they are all buffers rather than a file region.
if (msgCount > 1) {
if (msgCount >= 1) {
if (PlatformDependent.hasUnsafe()) {
// this means we can cast to EpollChannelOutboundBuffer and write the AdressEntry directly.
// this means we can cast to EpollChannelOutboundBuffer and write the IovArray directly.
EpollChannelOutboundBuffer epollIn = (EpollChannelOutboundBuffer) in;
// Ensure the pending writes are made of memoryaddresses only.
AddressEntry[] addresses = epollIn.memoryAddresses();
int addressesCnt = epollIn.addressCount();
if (addressesCnt > 1) {
if (!writeBytesMultiple(epollIn, msgCount, addresses, addressesCnt, epollIn.addressSize())) {
IovArray array = epollIn.iovArray();
int cnt = array.count();
if (cnt > 1) {
if (!writeBytesMultiple(epollIn, array)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
break;

View File

@ -0,0 +1,159 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
/**
* Represent an array of struct array and so can be passed directly over via JNI without the need to do any more
* array copies.
*
* The buffers are written out directly into direct memory to match the struct iov. See also <code>man writev</code>.
*
* <pre>
* struct iovec {
* void *iov_base;
* size_t iov_len;
* };
* </pre>
*
* See also
* <a href="http://rkennke.wordpress.com/2007/07/30/efficient-jni-programming-iv-wrapping-native-data-objects/">
* Efficient JNI programming IV: Wrapping native data objects</a>.
*/
final class IovArray {
// Maximal number of struct iov entries that can be passed to writev(...)
private static final int IOV_MAX = Native.IOV_MAX;
// The size of an address which should be 8 for 64 bits and 4 for 32 bits.
private static final int ADDRESS_SIZE = PlatformDependent.addressSize();
// The size of an struct iov entry in bytes. This is calculated as we have 2 entries each of the size of the
// address.
private static final int IOV_SIZE = 2 * ADDRESS_SIZE;
// The needed memory to hold up to IOV_MAX iov entries.
private static final int CAPACITY = IOV_MAX * IOV_SIZE;
private static final FastThreadLocal<IovArray> ARRAY = new FastThreadLocal<IovArray>() {
@Override
protected IovArray initialValue() throws Exception {
return new IovArray();
}
@Override
protected void onRemoval(IovArray value) throws Exception {
// free the direct memory now
PlatformDependent.freeMemory(value.memoryAddress);
}
};
private final long memoryAddress;
private int count;
private long size;
private IovArray() {
memoryAddress = PlatformDependent.allocateMemory(CAPACITY);
}
/**
* Try to add the given {@link ByteBuf}. Returns {@code true} on success,
* {@code false} otherwise.
*/
boolean add(ByteBuf buf) {
if (count == IOV_MAX) {
// No more room!
return false;
}
int len = buf.readableBytes();
long addr = buf.memoryAddress();
int offset = buf.readerIndex();
long baseOffset = memoryAddress(count++);
long lengthOffset = baseOffset + ADDRESS_SIZE;
if (ADDRESS_SIZE == 8) {
// 64bit
PlatformDependent.putLong(baseOffset, addr + offset);
PlatformDependent.putLong(lengthOffset, len);
} else {
assert ADDRESS_SIZE == 4;
PlatformDependent.putInt(baseOffset, (int) addr + offset);
PlatformDependent.putInt(lengthOffset, len);
}
size += len;
return true;
}
/**
* Process the written iov entries. This will return the length of the iov entry on the given index if it is
* smaller then the given {@code written} value. Otherwise it returns {@code -1}.
*/
long processWritten(int index, long written) {
long baseOffset = memoryAddress(index);
long lengthOffset = baseOffset + ADDRESS_SIZE;
if (ADDRESS_SIZE == 8) {
// 64bit
long len = PlatformDependent.getLong(lengthOffset);
if (len > written) {
long offset = PlatformDependent.getLong(baseOffset);
PlatformDependent.putLong(baseOffset, offset + written);
PlatformDependent.putLong(lengthOffset, len - written);
return -1;
}
return len;
} else {
assert ADDRESS_SIZE == 4;
long len = PlatformDependent.getInt(lengthOffset);
if (len > written) {
int offset = PlatformDependent.getInt(baseOffset);
PlatformDependent.putInt(baseOffset, (int) (offset + written));
PlatformDependent.putInt(lengthOffset, (int) (len - written));
return -1;
}
return len;
}
}
/**
* Returns the number if iov entries.
*/
int count() {
return count;
}
/**
* Returns the size in bytes
*/
long size() {
return size;
}
/**
* Returns the {@code memoryAddress} for the given {@code offset}.
*/
long memoryAddress(int offset) {
return memoryAddress + IOV_SIZE * offset;
}
/**
* Returns a {@link IovArray} which can be filled.
*/
static IovArray get() {
IovArray array = ARRAY.get();
array.size = 0;
array.count = 0;
return array;
}
}

View File

@ -18,7 +18,6 @@ package io.netty.channel.epoll;
import io.netty.channel.ChannelException;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.epoll.EpollChannelOutboundBuffer.AddressEntry;
import io.netty.util.internal.NativeLibraryLoader;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
@ -70,7 +69,7 @@ final class Native {
public static native int writeAddress(int fd, long address, int pos, int limit) throws IOException;
public static native long writev(int fd, ByteBuffer[] buffers, int offset, int length) throws IOException;
public static native long writevAddresses(int fd, AddressEntry[] addresses, int offset, int length)
public static native long writevAddresses(int fd, long memoryAddress, int length)
throws IOException;
public static native int read(int fd, ByteBuffer buf, int pos, int limit) throws IOException;

View File

@ -531,6 +531,10 @@ public class ChannelOutboundBuffer {
return unflushed;
}
protected final int entryMask() {
return buffer.length - 1;
}
protected ByteBuf copyToDirectByteBuf(ByteBuf buf) {
int readableBytes = buf.readableBytes();
ByteBufAllocator alloc = channel.alloc();

View File

@ -89,7 +89,7 @@ public final class NioSocketChannelOutboundBuffer extends ChannelOutboundBuffer
long nioBufferSize = 0;
int nioBufferCount = 0;
final Entry[] buffer = entries();
final int mask = buffer.length - 1;
final int mask = entryMask();
ByteBuffer[] nioBuffers = this.nioBuffers;
Object m;
int unflushed = unflushed();