Directly use memory addresses for gathering writes to reduce gc pressure. Part of [#2239]

This also does factor out some logic of ChannelOutboundBuffer. Mainly we not need nioBuffers() for many
transports and also not need to copy from heap to direct buffer. So this functionality was moved to
NioSocketChannelOutboundBuffer. Also introduce a EpollChannelOutboundBuffer which makes use of
memory addresses for all the writes to reduce GC pressure
This commit is contained in:
Norman Maurer 2014-02-18 10:08:20 +01:00
parent bea810707c
commit 60b830ba89
20 changed files with 1020 additions and 392 deletions

View File

@ -43,6 +43,9 @@ jfieldID fileChannelFieldId = NULL;
jfieldID transferedFieldId = NULL;
jfieldID fdFieldId = NULL;
jfieldID fileDescriptorFieldId = NULL;
jfieldID readerIndexFieldId = NULL;
jfieldID writerIndexFieldId = NULL;
jfieldID memoryAddressFieldId = NULL;
jmethodID inetSocketAddrMethodId = NULL;
jclass runtimeExceptionClass = NULL;
jclass ioExceptionClass = NULL;
@ -322,6 +325,27 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
return JNI_ERR;
}
socketType = socket_type();
jclass addressEntryClass = (*env)->FindClass(env, "io/netty/channel/epoll/EpollChannelOutboundBuffer$AddressEntry");
if (addressEntryClass == NULL) {
// pending exception...
return JNI_ERR;
}
readerIndexFieldId = (*env)->GetFieldID(env, addressEntryClass, "readerIndex", "I");
if (readerIndexFieldId == NULL) {
// pending exception...
return JNI_ERR;
}
writerIndexFieldId = (*env)->GetFieldID(env, addressEntryClass, "writerIndex", "I");
if (writerIndexFieldId == NULL) {
// pending exception...
return JNI_ERR;
}
memoryAddressFieldId = (*env)->GetFieldID(env, addressEntryClass, "memoryAddress", "J");
if (memoryAddressFieldId == NULL) {
// pending exception...
return JNI_ERR;
}
return JNI_VERSION_1_6;
}
}
@ -510,6 +534,29 @@ void incrementPosition(JNIEnv * env, jobject bufObj, int written) {
}
}
jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec iov[], jint length) {
ssize_t res;
int err;
do {
res = writev(fd, iov, length);
// keep on writing if it was interrupted
} while(res == -1 && ((err = errno) == EINTR));
if (res < 0) {
if (err == EAGAIN || err == EWOULDBLOCK) {
// network stack is saturated we will try again later
return 0;
}
if (err == EBADF) {
throwClosedChannelException(env);
return -1;
}
throwIOException(env, exceptionMessage("Error while writev(...): ", err));
return -1;
}
return (jlong) res;
}
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) {
struct iovec iov[length];
int i;
@ -541,32 +588,15 @@ JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env,
iov[iovidx].iov_len = (size_t) (limit - pos);
iovidx++;
}
ssize_t res;
int err;
do {
res = writev(fd, iov, length);
// keep on writing if it was interrupted
} while(res == -1 && ((err = errno) == EINTR));
if (res < 0) {
if (err == EAGAIN || err == EWOULDBLOCK) {
// network stack is saturated we will try again later
return 0;
}
if (err == EBADF) {
throwClosedChannelException(env);
return -1;
}
throwIOException(env, exceptionMessage("Error while writev(...): ", err));
return -1;
jlong res = writev0(env, clazz, fd, iov, length);
if (res <= 0) {
return res;
}
// update the position of the written buffers
int written = res;
int a;
for (a = 0; a < length; a++) {
int pos;
int len = iov[a].iov_len;
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, a + offset);
if (len >= written) {
@ -580,6 +610,27 @@ JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env,
return res;
}
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length) {
struct iovec iov[length];
int i;
int iovidx = 0;
for (i = offset; i < length; i++) {
jobject addressEntry = (*env)->GetObjectArrayElement(env, addresses, i);
jint readerIndex = (*env)->GetIntField(env, addressEntry, readerIndexFieldId);
jint writerIndex = (*env)->GetIntField(env, addressEntry, writerIndexFieldId);
void* memoryAddress = (void*) (*env)->GetLongField(env, addressEntry, memoryAddressFieldId);
iov[iovidx].iov_base = memoryAddress + readerIndex;
iov[iovidx].iov_len = (size_t) (writerIndex - readerIndex);
iovidx++;
}
jlong res = writev0(env, clazz, fd, iov, length);
if (res <= 0) {
return res;
}
}
jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) {
ssize_t res;
int err;

View File

@ -32,6 +32,8 @@ void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz,
jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
jint Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length);
jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length);
jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd);

View File

@ -0,0 +1,202 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.util.Recycler;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* Special {@link ChannelOutboundBuffer} implementation which allows to obtain an array of {@link AddressEntry}
* and so doing gathering writes without the need to create a {@link ByteBuffer} internally. This reduce
* GC pressure a lot.
*/
final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer {
private AddressEntry[] addresses;
private int addressCount;
private long addressSize;
private static final Recycler<EpollChannelOutboundBuffer> RECYCLER = new Recycler<EpollChannelOutboundBuffer>() {
@Override
protected EpollChannelOutboundBuffer newObject(Handle<EpollChannelOutboundBuffer> handle) {
return new EpollChannelOutboundBuffer(handle);
}
};
/**
* Get a new instance of this {@link EpollChannelOutboundBuffer} and attach it the given {@link EpollSocketChannel}
*/
static EpollChannelOutboundBuffer newInstance(EpollSocketChannel channel) {
EpollChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private EpollChannelOutboundBuffer(Recycler.Handle<? extends ChannelOutboundBuffer> handle) {
super(handle);
addresses = new AddressEntry[INITIAL_CAPACITY];
}
/**
* Check if the message is a {@link ByteBuf} and if so if it has a memoryAddress. If not it will convert this
* {@link ByteBuf} to be able to operate on the memoryAddress directly for maximal performance.
*/
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.hasMemoryAddress()) {
ByteBuf direct = copyToDirectByteBuf(buf);
return direct;
}
}
return msg;
}
/**
* Returns an array of {@link AddressEntry}'s if the currently pending messages are made of {@link ByteBuf} only.
* {@code null} is returned otherwise. If this method returns a non-null array, {@link #addressCount()} and
* {@link #addressSize()} ()} will return the number of {@link AddressEntry}'s in the returned array and the total
* number of readable bytes of the NIO buffers respectively.
* <p>
* Note that the returned array is reused and thus should not escape
* {@link io.netty.channel.AbstractChannel#doWrite(ChannelOutboundBuffer)}.
* Refer to {@link EpollSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
* </p>
*/
AddressEntry[] memoryAddresses() {
long addressSize = 0;
int addressCount = 0;
final Entry[] buffer = entries();
final int mask = buffer.length - 1;
AddressEntry[] addresses = this.addresses;
Object m;
int unflushed = unflushed();
int flushed = flushed();
while (flushed != unflushed && (m = buffer[flushed].msg()) != null) {
if (!(m instanceof ByteBuf)) {
this.addressCount = 0;
this.addressSize = 0;
return null;
}
AddressEntry entry = (AddressEntry) buffer[flushed];
// Check if the entry was cancelled. if so we just skip it.
if (!entry.isCancelled()) {
ByteBuf buf = (ByteBuf) m;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) {
addressSize += readableBytes;
// See if there is enough space to at least store one more entry.
int neededSpace = addressCount + 1;
if (neededSpace > addresses.length) {
this.addresses = addresses =
expandAddressesArray(addresses, neededSpace, addressCount);
}
entry.memoryAddress = buf.memoryAddress();
entry.readerIndex = buf.readerIndex();
entry.writerIndex = buf.writerIndex();
addresses[addressCount ++] = entry;
}
}
flushed = flushed + 1 & mask;
}
this.addressCount = addressCount;
this.addressSize = addressSize;
return addresses;
}
private static AddressEntry[] expandAddressesArray(AddressEntry[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// double capacity until it is big enough
// See https://github.com/netty/netty/issues/1890
newCapacity <<= 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
} while (neededSpace > newCapacity);
AddressEntry[] newArray = new AddressEntry[newCapacity];
System.arraycopy(array, 0, newArray, 0, size);
return newArray;
}
/**
* Return the number of {@link AddressEntry}'s which can be written.
*/
int addressCount() {
return addressCount;
}
/**
* Return the number of bytes that can be written via gathering writes.
*/
long addressSize() {
return addressSize;
}
@Override
public void recycle() {
if (addresses.length > INITIAL_CAPACITY) {
addresses = new AddressEntry[INITIAL_CAPACITY];
} else {
// null out the nio buffers array so the can be GC'ed
// https://github.com/netty/netty/issues/1763
Arrays.fill(addresses, null);
}
super.recycle();
}
@Override
protected AddressEntry newEntry() {
return new AddressEntry();
}
static final class AddressEntry extends Entry {
// These fields will be accessed via JNI directly so be carefully when touch them!
long memoryAddress;
int readerIndex;
int writerIndex;
@Override
public void clear() {
memoryAddress = -1;
readerIndex = 0;
writerIndex = 0;
super.clear();
}
@Override
public int cancel() {
memoryAddress = -1;
readerIndex = 0;
writerIndex = 0;
return super.cancel();
}
}
}

View File

@ -29,9 +29,12 @@ import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.epoll.EpollChannelOutboundBuffer.AddressEntry;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannelOutboundBuffer;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
@ -133,10 +136,49 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
private void writeBytesMultiple(
ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers) throws IOException {
EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] nioBuffers) throws IOException {
int nioBufferCnt = in.addressCount();
long expectedWrittenBytes = in.addressSize();
long localWrittenBytes = Native.writevAddresses(fd, nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes < expectedWrittenBytes) {
setEpollOut();
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < localWrittenBytes) {
in.remove();
localWrittenBytes -= readableBytes;
} else if (readableBytes > localWrittenBytes) {
buf.readerIndex(readerIndex + (int) localWrittenBytes);
in.progress(localWrittenBytes);
break;
} else { // readable == writtenBytes
in.remove();
break;
}
}
} else {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
}
}
}
private void writeBytesMultiple(
NioSocketChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers) throws IOException {
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
long expectedWrittenBytes = in.nioBufferCount();
long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt);
@ -196,16 +238,32 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
// * the outbound buffer contains more than one messages and
// * they are all buffers rather than a file region.
if (msgCount > 1) {
// Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
if (nioBuffers != null) {
writeBytesMultiple(in, msgCount, nioBuffers);
if (PlatformDependent.hasUnsafe()) {
// this means we can cast to EpollChannelOutboundBuffer and write the AdressEntry directly.
EpollChannelOutboundBuffer epollIn = (EpollChannelOutboundBuffer) in;
// Ensure the pending writes are made of memoryaddresses only.
AddressEntry[] addresses = epollIn.memoryAddresses();
if (addresses != null) {
writeBytesMultiple(epollIn, msgCount, addresses);
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
continue;
}
} else {
NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in;
// Ensure the pending writes are made of memoryaddresses only.
ByteBuffer[] buffers = nioIn.nioBuffers();
if (buffers != null) {
writeBytesMultiple(nioIn, msgCount, buffers);
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
continue;
}
}
}
// The outbound buffer contains only one message or it contains a file region.
@ -300,24 +358,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
final class EpollSocketUnsafe extends AbstractEpollUnsafe {
private RecvByteBufAllocator.Handle allocHandle;
@Override
public void write(Object msg, ChannelPromise promise) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isDirect()) {
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
int readable = buf.readableBytes();
ByteBuf dst = alloc().directBuffer(readable);
dst.writeBytes(buf, buf.readerIndex(), readable);
buf.release();
msg = dst;
}
}
super.write(msg, promise);
}
private void closeOnRead(ChannelPipeline pipeline) {
inputShutdown = true;
if (isOpen()) {
@ -609,4 +649,16 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
}
}
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
if (PlatformDependent.hasUnsafe()) {
// This means we will be able to access the memory addresses directly and so be able to do
// gathering writes with the AddressEntry.
return EpollChannelOutboundBuffer.newInstance(this);
} else {
// No access to the memoryAddres, so fallback to use ByteBuffer[] for gathering writes.
return NioSocketChannelOutboundBuffer.newInstance(this);
}
}
}

View File

@ -17,6 +17,7 @@ package io.netty.channel.epoll;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.epoll.EpollChannelOutboundBuffer.AddressEntry;
import io.netty.util.internal.NativeLibraryLoader;
import java.io.IOException;
@ -65,6 +66,9 @@ final class Native {
public static native int writeAddress(int fd, long address, int pos, int limit) throws IOException;
public static native long writev(int fd, ByteBuffer[] buffers, int offset, int length) throws IOException;
public static native long writevAddresses(int fd, AddressEntry[] addresses, int offset, int length)
throws IOException;
public static native int read(int fd, ByteBuffer buf, int pos, int limit) throws IOException;
public static native int readAddress(int fd, long address, int pos, int limit) throws IOException;

View File

@ -20,7 +20,7 @@ import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.NotificationHandler;
import com.sun.nio.sctp.SctpChannel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
@ -35,6 +35,7 @@ import io.netty.channel.sctp.SctpChannelConfig;
import io.netty.channel.sctp.SctpMessage;
import io.netty.channel.sctp.SctpNotificationHandler;
import io.netty.channel.sctp.SctpServerChannel;
import io.netty.util.Recycler;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -303,20 +304,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
return true;
}
ByteBufAllocator alloc = alloc();
boolean needsCopy = data.nioBufferCount() != 1;
if (!needsCopy) {
if (!data.isDirect() && alloc.isDirectBufferPooled()) {
needsCopy = true;
}
}
ByteBuffer nioData;
if (!needsCopy) {
nioData = data.nioBuffer();
} else {
data = alloc.directBuffer(dataLen).writeBytes(data);
nioData = data.nioBuffer();
}
ByteBuffer nioData = data.nioBuffer();
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
mi.payloadProtocolID(packet.protocolIdentifier());
@ -325,13 +313,6 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
final int writtenBytes = javaChannel().send(nioData, mi);
boolean done = writtenBytes > 0;
if (needsCopy) {
if (!done) {
in.current(new SctpMessage(mi, data));
} else {
in.current(data);
}
}
return done;
}
@ -384,4 +365,42 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
}
return promise;
}
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
return NioSctpChannelOutboundBuffer.newInstance(this);
}
private static final class NioSctpChannelOutboundBuffer extends ChannelOutboundBuffer {
private static final Recycler<NioSctpChannelOutboundBuffer> RECYCLER =
new Recycler<NioSctpChannelOutboundBuffer>() {
@Override
protected NioSctpChannelOutboundBuffer newObject(Handle<NioSctpChannelOutboundBuffer> handle) {
return new NioSctpChannelOutboundBuffer(handle);
}
};
static NioSctpChannelOutboundBuffer newInstance(AbstractChannel channel) {
NioSctpChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private NioSctpChannelOutboundBuffer(Recycler.Handle<NioSctpChannelOutboundBuffer> handle) {
super(handle);
}
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof SctpMessage) {
SctpMessage message = (SctpMessage) msg;
ByteBuf content = message.content();
if (!content.isDirect() || content.nioBufferCount() != 1) {
ByteBuf direct = copyToDirectByteBuf(content);
return new SctpMessage(message.protocolIdentifier(), message.streamIdentifier(), direct);
}
}
return msg;
}
}
}

View File

@ -368,7 +368,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/
protected abstract class AbstractUnsafe implements Unsafe {
private ChannelOutboundBuffer outboundBuffer = ChannelOutboundBuffer.newInstance(AbstractChannel.this);
private ChannelOutboundBuffer outboundBuffer = newOutboundBuffer();
private boolean inFlush0;
@Override
@ -726,6 +727,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return eventLoop;
}
/**
* Create a new {@link ChannelOutboundBuffer} which holds the pending messages for this {@link AbstractChannel}.
*/
protected ChannelOutboundBuffer newOutboundBuffer() {
return ChannelOutboundBuffer.newInstance(this);
}
/**
* Return {@code true} if the given {@link EventLoop} is compatible with this instance.
*/

View File

@ -22,21 +22,15 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.UnpooledDirectByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@ -44,18 +38,11 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
* (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
* outbound write requests.
*/
public final class ChannelOutboundBuffer {
public class ChannelOutboundBuffer {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
private static final int INITIAL_CAPACITY = 32;
private static final int threadLocalDirectBufferSize;
static {
threadLocalDirectBufferSize = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize);
}
protected static final int INITIAL_CAPACITY = 32;
private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() {
@Override
@ -64,17 +51,18 @@ public final class ChannelOutboundBuffer {
}
};
/**
* Get a new instance of this {@link ChannelOutboundBuffer} and attach it the given {@link AbstractChannel}
*/
static ChannelOutboundBuffer newInstance(AbstractChannel channel) {
ChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
buffer.totalPendingSize = 0;
buffer.writable = 1;
return buffer;
}
private final Handle<ChannelOutboundBuffer> handle;
private final Handle<? extends ChannelOutboundBuffer> handle;
private AbstractChannel channel;
protected AbstractChannel channel;
// A circular buffer used to store messages. The buffer is arranged such that: flushed <= unflushed <= tail. The
// flushed messages are stored in the range [flushed, unflushed). Unflushed messages are stored in the range
@ -84,10 +72,6 @@ public final class ChannelOutboundBuffer {
private int unflushed;
private int tail;
private ByteBuffer[] nioBuffers;
private int nioBufferCount;
private long nioBufferSize;
private boolean inFail;
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER;
@ -114,18 +98,29 @@ public final class ChannelOutboundBuffer {
private volatile int writable = 1;
private ChannelOutboundBuffer(Handle<ChannelOutboundBuffer> handle) {
protected ChannelOutboundBuffer(Handle<? extends ChannelOutboundBuffer> handle) {
this.handle = handle;
buffer = new Entry[INITIAL_CAPACITY];
for (int i = 0; i < buffer.length; i++) {
buffer[i] = newEntry();
}
nioBuffers = new ByteBuffer[INITIAL_CAPACITY];
}
void addMessage(Object msg, ChannelPromise promise) {
/**
* Return the array of {@link Entry}'s which hold the pending write requests in an circular array.
*/
protected final Entry[] entries() {
return buffer;
}
/**
* Add the given message to this {@link ChannelOutboundBuffer} so it will be marked as flushed once
* {@link #addFlush()} was called. The {@link ChannelPromise} will be notified once the write operations
* completes.
*/
public final void addMessage(Object msg, ChannelPromise promise) {
msg = beforeAdd(msg);
int size = channel.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
@ -148,6 +143,17 @@ public final class ChannelOutboundBuffer {
incrementPendingOutboundBytes(size);
}
/**
* Is called before the message is actually added to the {@link ChannelOutboundBuffer} and so allow to
* convert it to a different format. Sub-classes may override this.
*/
protected Object beforeAdd(Object msg) {
return msg;
}
/**
* Expand internal array which holds the {@link Entry}'s.
*/
private void addCapacity() {
int p = flushed;
int n = buffer.length;
@ -172,7 +178,10 @@ public final class ChannelOutboundBuffer {
tail = n;
}
void addFlush() {
/**
* Mark all messages in this {@link ChannelOutboundBuffer} as flushed.
*/
public final void addFlush() {
unflushed = tail;
final int mask = buffer.length - 1;
@ -192,7 +201,7 @@ public final class ChannelOutboundBuffer {
* Increment the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void incrementPendingOutboundBytes(int size) {
final void incrementPendingOutboundBytes(int size) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets
// recycled while process this method.
Channel channel = this.channel;
@ -220,7 +229,7 @@ public final class ChannelOutboundBuffer {
* Decrement the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void decrementPendingOutboundBytes(int size) {
final void decrementPendingOutboundBytes(int size) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets
// recycled while process this method.
Channel channel = this.channel;
@ -257,60 +266,20 @@ public final class ChannelOutboundBuffer {
return -1;
}
public Object current() {
return current(true);
}
public Object current(boolean preferDirect) {
/**
* Return current message or {@code null} if no flushed message is left to process.
*/
public final Object current() {
if (isEmpty()) {
return null;
} else {
// TODO: Think of a smart way to handle ByteBufHolder messages
Entry entry = buffer[flushed];
Object msg = entry.msg;
if (threadLocalDirectBufferSize <= 0 || !preferDirect) {
return msg;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return buf;
} else {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
return buf;
}
// Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
// We can do a better job by using our pooled allocator. If the current allocator does not
// pool a direct buffer, we use a ThreadLocal based pool.
ByteBufAllocator alloc = channel.alloc();
ByteBuf directBuf;
if (alloc.isDirectBufferPooled()) {
directBuf = alloc.directBuffer(readableBytes);
} else {
directBuf = ThreadLocalPooledByteBuf.newInstance();
}
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
current(directBuf);
return directBuf;
}
}
return msg;
return entry.msg;
}
}
/**
* Replace the current msg with the given one.
* The replaced msg will automatically be released
*/
public void current(Object msg) {
Entry entry = buffer[flushed];
safeRelease(entry.msg);
entry.msg = msg;
}
public void progress(long amount) {
public final void progress(long amount) {
Entry e = buffer[flushed];
ChannelPromise p = e.promise;
if (p instanceof ChannelProgressivePromise) {
@ -320,7 +289,11 @@ public final class ChannelOutboundBuffer {
}
}
public boolean remove() {
/**
* Mark the current message as successful written and remove it from this {@link ChannelOutboundBuffer}.
* This method will return {@code true} if there are more messages left to process, {@code false} otherwise.
*/
public final boolean remove() {
if (isEmpty()) {
return false;
}
@ -348,7 +321,12 @@ public final class ChannelOutboundBuffer {
return true;
}
public boolean remove(Throwable cause) {
/**
* Mark the current message as failure with the given {@link java.lang.Throwable} and remove it from this
* {@link ChannelOutboundBuffer}. This method will return {@code true} if there are more messages left to process,
* {@code false} otherwise.
*/
public final boolean remove(Throwable cause) {
if (isEmpty()) {
return false;
}
@ -377,152 +355,28 @@ public final class ChannelOutboundBuffer {
return true;
}
/**
* Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
* {@code null} is returned otherwise. If this method returns a non-null array, {@link #nioBufferCount()} and
* {@link #nioBufferSize()} will return the number of NIO buffers in the returned array and the total number
* of readable bytes of the NIO buffers respectively.
* <p>
* Note that the returned array is reused and thus should not escape
* {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
* Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
* </p>
*/
public ByteBuffer[] nioBuffers() {
long nioBufferSize = 0;
int nioBufferCount = 0;
final int mask = buffer.length - 1;
final ByteBufAllocator alloc = channel.alloc();
ByteBuffer[] nioBuffers = this.nioBuffers;
Object m;
int i = flushed;
while (i != unflushed && (m = buffer[i].msg) != null) {
if (!(m instanceof ByteBuf)) {
this.nioBufferCount = 0;
this.nioBufferSize = 0;
return null;
}
Entry entry = buffer[i];
if (!entry.cancelled) {
ByteBuf buf = (ByteBuf) m;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) {
nioBufferSize += readableBytes;
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount();
}
int neededSpace = nioBufferCount + count;
if (neededSpace > nioBuffers.length) {
this.nioBuffers = nioBuffers =
expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
}
if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount ++] = nioBuf;
} else {
ByteBuffer[] nioBufs = entry.buffers;
if (nioBufs == null) {
// cached ByteBuffers as they may be expensive to create in terms
// of Object allocation
entry.buffers = nioBufs = buf.nioBuffers();
}
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
}
} else {
nioBufferCount = fillBufferArrayNonDirect(entry, buf, readerIndex,
readableBytes, alloc, nioBuffers, nioBufferCount);
}
}
}
i = i + 1 & mask;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}
private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) {
for (ByteBuffer nioBuf: nioBufs) {
if (nioBuf == null) {
break;
}
nioBuffers[nioBufferCount ++] = nioBuf;
}
return nioBufferCount;
}
private static int fillBufferArrayNonDirect(Entry entry, ByteBuf buf, int readerIndex, int readableBytes,
ByteBufAllocator alloc, ByteBuffer[] nioBuffers, int nioBufferCount) {
ByteBuf directBuf;
if (alloc.isDirectBufferPooled()) {
directBuf = alloc.directBuffer(readableBytes);
} else {
directBuf = ThreadLocalPooledByteBuf.newInstance();
}
directBuf.writeBytes(buf, readerIndex, readableBytes);
buf.release();
entry.msg = directBuf;
// cache ByteBuffer
ByteBuffer nioBuf = entry.buf = directBuf.internalNioBuffer(0, readableBytes);
entry.count = 1;
nioBuffers[nioBufferCount ++] = nioBuf;
return nioBufferCount;
}
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// double capacity until it is big enough
// See https://github.com/netty/netty/issues/1890
newCapacity <<= 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
} while (neededSpace > newCapacity);
ByteBuffer[] newArray = new ByteBuffer[newCapacity];
System.arraycopy(array, 0, newArray, 0, size);
return newArray;
}
public int nioBufferCount() {
return nioBufferCount;
}
public long nioBufferSize() {
return nioBufferSize;
}
boolean getWritable() {
final boolean getWritable() {
return writable != 0;
}
public int size() {
/**
* Return the number of messages that are ready to be written (flushed before).
*/
public final int size() {
return unflushed - flushed & buffer.length - 1;
}
public boolean isEmpty() {
/**
* Return {@code true} if this {@link ChannelOutboundBuffer} contains no flushed messages
*/
public final boolean isEmpty() {
return unflushed == flushed;
}
void failFlushed(Throwable cause) {
/**
* Fail all previous flushed messages with the given {@link Throwable}.
*/
final void failFlushed(Throwable cause) {
// Make sure that this method does not reenter. A listener added to the current promise can be notified by the
// current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
// indirectly (usually by closing the channel.)
@ -544,7 +398,10 @@ public final class ChannelOutboundBuffer {
}
}
void close(final ClosedChannelException cause) {
/**
* Fail all pending messages with the given {@link ClosedChannelException}.
*/
final void close(final ClosedChannelException cause) {
if (inFail) {
channel.eventLoop().execute(new Runnable() {
@Override
@ -596,7 +453,10 @@ public final class ChannelOutboundBuffer {
recycle();
}
private static void safeRelease(Object message) {
/**
* Release the message and log if any error happens during release.
*/
protected static void safeRelease(Object message) {
try {
ReferenceCountUtil.release(message);
} catch (Throwable t) {
@ -604,18 +464,29 @@ public final class ChannelOutboundBuffer {
}
}
/**
* Try to mark the given {@link ChannelPromise} as success and log if this failed.
*/
private static void safeSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
/**
* Try to mark the given {@link ChannelPromise} as failued with the given {@link Throwable} and log if this failed.
*/
private static void safeFail(ChannelPromise promise, Throwable cause) {
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
}
}
/**
* Recycle this {@link ChannelOutboundBuffer}. After this was called it is disallowed to use it with the previous
* assigned {@link AbstractChannel}.
*/
@SuppressWarnings("unchecked")
public void recycle() {
if (buffer.length > INITIAL_CAPACITY) {
Entry[] e = new Entry[INITIAL_CAPACITY];
@ -623,14 +494,6 @@ public final class ChannelOutboundBuffer {
buffer = e;
}
if (nioBuffers.length > INITIAL_CAPACITY) {
nioBuffers = new ByteBuffer[INITIAL_CAPACITY];
} else {
// null out the nio buffers array so the can be GC'ed
// https://github.com/netty/netty/issues/1763
Arrays.fill(nioBuffers, null);
}
// reset flushed, unflushed and tail
// See https://github.com/netty/netty/issues/1772
flushed = 0;
@ -640,17 +503,61 @@ public final class ChannelOutboundBuffer {
// Set the channel to null so it can be GC'ed ASAP
channel = null;
RECYCLER.recycle(this, handle);
totalPendingSize = 0;
writable = 1;
RECYCLER.recycle(this, (Handle<ChannelOutboundBuffer>) handle);
}
public long totalPendingWriteBytes() {
/**
* Return the total number of pending bytes.
*/
public final long totalPendingWriteBytes() {
return totalPendingSize;
}
private static final class Entry {
/**
* Create a new {@link Entry} to use for the internal datastructure. Sub-classes may override this use a special
* sub-class.
*/
protected Entry newEntry() {
return new Entry();
}
/**
* Return the index of the first flushed message.
*/
protected final int flushed() {
return flushed;
}
/**
* Return the index of the first unflushed messages.
*/
protected final int unflushed() {
return unflushed;
}
protected ByteBuf copyToDirectByteBuf(ByteBuf buf) {
int readableBytes = buf.readableBytes();
ByteBufAllocator alloc = channel.alloc();
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
safeRelease(buf);
return directBuf;
}
if (ThreadLocalPooledDirectByteBuf.threadLocalDirectBufferSize > 0) {
ByteBuf directBuf = ThreadLocalPooledDirectByteBuf.newInstance();
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
safeRelease(buf);
return directBuf;
}
return buf;
}
protected static class Entry {
Object msg;
ByteBuffer[] buffers;
ByteBuffer buf;
ChannelPromise promise;
long progress;
long total;
@ -658,6 +565,22 @@ public final class ChannelOutboundBuffer {
int count = -1;
boolean cancelled;
public Object msg() {
return msg;
}
/**
* Return {@code true} if the {@link Entry} was cancelled via {@link #cancel()} before,
* {@code false} otherwise.
*/
public boolean isCancelled() {
return cancelled;
}
/**
* Cancel this {@link Entry} and the message that was hold by this {@link Entry}. This method returns the
* number of pending bytes for the cancelled message.
*/
public int cancel() {
if (!cancelled) {
cancelled = true;
@ -670,16 +593,15 @@ public final class ChannelOutboundBuffer {
pendingSize = 0;
total = 0;
progress = 0;
buffers = null;
buf = null;
return pSize;
}
return 0;
}
/**
* Clear this {@link Entry} and so release all resources.
*/
public void clear() {
buffers = null;
buf = null;
msg = null;
promise = null;
progress = 0;
@ -689,36 +611,4 @@ public final class ChannelOutboundBuffer {
cancelled = false;
}
}
static final class ThreadLocalPooledByteBuf extends UnpooledDirectByteBuf {
private final Recycler.Handle<ThreadLocalPooledByteBuf> handle;
private static final Recycler<ThreadLocalPooledByteBuf> RECYCLER = new Recycler<ThreadLocalPooledByteBuf>() {
@Override
protected ThreadLocalPooledByteBuf newObject(Handle<ThreadLocalPooledByteBuf> handle) {
return new ThreadLocalPooledByteBuf(handle);
}
};
private ThreadLocalPooledByteBuf(Recycler.Handle<ThreadLocalPooledByteBuf> handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}
static ThreadLocalPooledByteBuf newInstance() {
ThreadLocalPooledByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
return buf;
}
@Override
protected void deallocate() {
if (capacity() > threadLocalDirectBufferSize) {
super.deallocate();
} else {
clear();
RECYCLER.recycle(this, handle);
}
}
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/*
* Written by Josh Bloch of Google Inc. and released to the public domain,
* as explained at http://creativecommons.org/publicdomain/zero/1.0/.
*/
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.UnpooledDirectByteBuf;
import io.netty.buffer.UnpooledUnsafeDirectByteBuf;
import io.netty.util.Recycler;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
final class ThreadLocalPooledDirectByteBuf {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(ThreadLocalPooledDirectByteBuf.class);
public static final int threadLocalDirectBufferSize;
static {
threadLocalDirectBufferSize = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize);
}
public static ByteBuf newInstance() {
if (PlatformDependent.hasUnsafe()) {
return ThreadLocalUnsafeDirectByteBuf.newInstance();
} else {
return ThreadLocalDirectByteBuf.newInstance();
}
}
private ThreadLocalPooledDirectByteBuf() {
// utility
}
private static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
private static final Recycler<ThreadLocalUnsafeDirectByteBuf> RECYCLER =
new Recycler<ThreadLocalUnsafeDirectByteBuf>() {
@Override
protected ThreadLocalUnsafeDirectByteBuf newObject(Handle<ThreadLocalUnsafeDirectByteBuf> handle) {
return new ThreadLocalUnsafeDirectByteBuf(handle);
}
};
static ThreadLocalUnsafeDirectByteBuf newInstance() {
ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
return buf;
}
private final Recycler.Handle<ThreadLocalUnsafeDirectByteBuf> handle;
private ThreadLocalUnsafeDirectByteBuf(Recycler.Handle<ThreadLocalUnsafeDirectByteBuf> handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}
@Override
protected void deallocate() {
if (capacity() > threadLocalDirectBufferSize) {
super.deallocate();
} else {
clear();
RECYCLER.recycle(this, handle);
}
}
}
private static final class ThreadLocalDirectByteBuf extends UnpooledDirectByteBuf {
private static final Recycler<ThreadLocalDirectByteBuf> RECYCLER = new Recycler<ThreadLocalDirectByteBuf>() {
@Override
protected ThreadLocalDirectByteBuf newObject(Handle<ThreadLocalDirectByteBuf> handle) {
return new ThreadLocalDirectByteBuf(handle);
}
};
static ThreadLocalDirectByteBuf newInstance() {
ThreadLocalDirectByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
return buf;
}
private final Recycler.Handle<ThreadLocalDirectByteBuf> handle;
private ThreadLocalDirectByteBuf(Recycler.Handle<ThreadLocalDirectByteBuf> handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}
@Override
protected void deallocate() {
if (capacity() > threadLocalDirectBufferSize) {
super.deallocate();
} else {
clear();
RECYCLER.recycle(this, handle);
}
}
}
}

View File

@ -329,7 +329,7 @@ public class EmbeddedChannel extends AbstractChannel {
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
Object msg = in.current(false);
Object msg = in.current();
if (msg == null) {
break;
}

View File

@ -287,7 +287,7 @@ public class LocalChannel extends AbstractChannel {
// Use a copy because the original msgs will be recycled by AbstractChannel.
final Object[] msgsCopy = new Object[in.size()];
for (int i = 0; i < msgsCopy.length; i ++) {
msgsCopy[i] = ReferenceCountUtil.retain(in.current(false));
msgsCopy[i] = ReferenceCountUtil.retain(in.current());
in.remove();
}

View File

@ -180,7 +180,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
int writeSpinCount = -1;
for (;;) {
Object msg = in.current(true);
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();

View File

@ -55,6 +55,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
key.interestOps(interestOps & ~readInterestOp);
}
}
@Override
public void read() {
assert eventLoop().inEventLoop();

View File

@ -153,7 +153,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
Object msg = in.current(false);
Object msg = in.current();
if (msg == null) {
// nothing left to write
break;

View File

@ -16,7 +16,6 @@
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.Channel;
@ -26,7 +25,6 @@ import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.AbstractNioMessageChannel;
@ -261,20 +259,7 @@ public final class NioDatagramChannel
return true;
}
ByteBufAllocator alloc = alloc();
boolean needsCopy = data.nioBufferCount() != 1;
if (!needsCopy) {
if (!data.isDirect() && alloc.isDirectBufferPooled()) {
needsCopy = true;
}
}
ByteBuffer nioData;
if (!needsCopy) {
nioData = data.nioBuffer();
} else {
data = alloc.directBuffer(dataLen).writeBytes(data);
nioData = data.nioBuffer();
}
ByteBuffer nioData = data.nioBuffer();
final int writtenBytes;
if (remoteAddress != null) {
@ -284,22 +269,6 @@ public final class NioDatagramChannel
}
boolean done = writtenBytes > 0;
if (needsCopy) {
// This means we have allocated a new buffer and need to store it back so we not need to allocate it again
// later
if (remoteAddress == null) {
// remoteAddress is null which means we can handle it as ByteBuf directly
in.current(data);
} else {
if (!done) {
// store it back with all the needed informations
in.current(new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(data, remoteAddress));
} else {
// Just store back the new create buffer so it is cleaned up once in.remove() is called.
in.current(data);
}
}
}
return done;
}
@ -538,4 +507,9 @@ public final class NioDatagramChannel
}
return promise;
}
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
return NioDatagramChannelOutboundBuffer.newInstance(this);
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.Recycler;
/**
* Special {@link ChannelOutboundBuffer} for {@link NioDatagramChannel} implementations.
*/
final class NioDatagramChannelOutboundBuffer extends ChannelOutboundBuffer {
private static final Recycler<NioDatagramChannelOutboundBuffer> RECYCLER =
new Recycler<NioDatagramChannelOutboundBuffer>() {
@Override
protected NioDatagramChannelOutboundBuffer newObject(Handle<NioDatagramChannelOutboundBuffer> handle) {
return new NioDatagramChannelOutboundBuffer(handle);
}
};
/**
* Get a new instance of this {@link NioSocketChannelOutboundBuffer} and attach it the given
* {@link .NioDatagramChannel}.
*/
static NioDatagramChannelOutboundBuffer newInstance(NioDatagramChannel channel) {
NioDatagramChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private NioDatagramChannelOutboundBuffer(Recycler.Handle<NioDatagramChannelOutboundBuffer> handle) {
super(handle);
}
/**
* Convert all non direct {@link ByteBuf} to direct {@link ByteBuf}'s. This is done as the JDK implementation
* will do the conversation itself and we can do a better job here.
*/
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (!content.isDirect() || content.nioBufferCount() != 1) {
ByteBuf direct = copyToDirectByteBuf(content);
return new DatagramPacket(direct, packet.recipient(), packet.sender());
}
}
return msg;
}
}

View File

@ -231,16 +231,16 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
super.doWrite(in);
return;
}
NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in;
// Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
ByteBuffer[] nioBuffers = nioIn.nioBuffers();
if (nioBuffers == null) {
super.doWrite(in);
return;
}
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
int nioBufferCnt = nioIn.nioBufferCount();
long expectedWrittenBytes = nioIn.nioBufferSize();
final SocketChannel ch = javaChannel();
long writtenBytes = 0;
@ -263,7 +263,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
if (done) {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
nioIn.remove();
}
// Finish the write loop if no new messages were flushed by in.remove().
@ -281,16 +281,16 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < writtenBytes) {
in.progress(readableBytes);
in.remove();
nioIn.progress(readableBytes);
nioIn.remove();
writtenBytes -= readableBytes;
} else if (readableBytes > writtenBytes) {
buf.readerIndex(readerIndex + (int) writtenBytes);
in.progress(writtenBytes);
nioIn.progress(writtenBytes);
break;
} else { // readableBytes == writtenBytes
in.progress(readableBytes);
in.remove();
nioIn.progress(readableBytes);
nioIn.remove();
break;
}
}
@ -300,4 +300,9 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
}
}
}
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
return NioSocketChannelOutboundBuffer.newInstance(this);
}
}

View File

@ -0,0 +1,233 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/*
* Written by Josh Bloch of Google Inc. and released to the public domain,
* as explained at http://creativecommons.org/publicdomain/zero/1.0/.
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.util.Recycler;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* Special {@link ChannelOutboundBuffer} implementation which allows to also access flushed {@link ByteBuffer} to
* allow efficent gathering writes.
*/
public final class NioSocketChannelOutboundBuffer extends ChannelOutboundBuffer {
private ByteBuffer[] nioBuffers;
private int nioBufferCount;
private long nioBufferSize;
private static final Recycler<NioSocketChannelOutboundBuffer> RECYCLER =
new Recycler<NioSocketChannelOutboundBuffer>() {
@Override
protected NioSocketChannelOutboundBuffer newObject(Handle<NioSocketChannelOutboundBuffer> handle) {
return new NioSocketChannelOutboundBuffer(handle);
}
};
/**
* Get a new instance of this {@link NioSocketChannelOutboundBuffer} and attach it the given {@link AbstractChannel}
*/
public static NioSocketChannelOutboundBuffer newInstance(AbstractChannel channel) {
NioSocketChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private NioSocketChannelOutboundBuffer(Recycler.Handle<? extends NioSocketChannelOutboundBuffer> handle) {
super(handle);
nioBuffers = new ByteBuffer[INITIAL_CAPACITY];
}
/**
* Convert all non direct {@link ByteBuf} to direct {@link ByteBuf}'s. This is done as the JDK implementation
* will do the conversation itself and we can do a better job here.
*/
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isDirect()) {
return copyToDirectByteBuf(buf);
}
}
return msg;
}
/**
* Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
* {@code null} is returned otherwise. If this method returns a non-null array, {@link #nioBufferCount()} and
* {@link #nioBufferSize()} will return the number of NIO buffers in the returned array and the total number
* of readable bytes of the NIO buffers respectively.
* <p>
* Note that the returned array is reused and thus should not escape
* {@link io.netty.channel.AbstractChannel#doWrite(ChannelOutboundBuffer)}.
* Refer to {@link io.netty.channel.socket.nio.NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
* </p>
*/
public ByteBuffer[] nioBuffers() {
long nioBufferSize = 0;
int nioBufferCount = 0;
final Entry[] buffer = entries();
final int mask = buffer.length - 1;
ByteBuffer[] nioBuffers = this.nioBuffers;
Object m;
int unflushed = unflushed();
int i = flushed();
while (i != unflushed && (m = buffer[i].msg()) != null) {
if (!(m instanceof ByteBuf)) {
this.nioBufferCount = 0;
this.nioBufferSize = 0;
return null;
}
NioEntry entry = (NioEntry) buffer[i];
if (!entry.isCancelled()) {
ByteBuf buf = (ByteBuf) m;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) {
nioBufferSize += readableBytes;
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount();
}
int neededSpace = nioBufferCount + count;
if (neededSpace > nioBuffers.length) {
this.nioBuffers = nioBuffers =
expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
}
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount ++] = nioBuf;
} else {
ByteBuffer[] nioBufs = entry.buffers;
if (nioBufs == null) {
// cached ByteBuffers as they may be expensive to create in terms
// of Object allocation
entry.buffers = nioBufs = buf.nioBuffers();
}
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
}
}
}
i = i + 1 & mask;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}
private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) {
for (ByteBuffer nioBuf: nioBufs) {
if (nioBuf == null) {
break;
}
nioBuffers[nioBufferCount ++] = nioBuf;
}
return nioBufferCount;
}
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// double capacity until it is big enough
// See https://github.com/netty/netty/issues/1890
newCapacity <<= 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
} while (neededSpace > newCapacity);
ByteBuffer[] newArray = new ByteBuffer[newCapacity];
System.arraycopy(array, 0, newArray, 0, size);
return newArray;
}
/**
* Return the number of {@link java.nio.ByteBuffer} which can be written.
*/
public int nioBufferCount() {
return nioBufferCount;
}
/**
* Return the number of bytes that can be written via gathering writes.
*/
public long nioBufferSize() {
return nioBufferSize;
}
@Override
public void recycle() {
// take care of recycle the ByteBuffer[] structure.
if (nioBuffers.length > INITIAL_CAPACITY) {
nioBuffers = new ByteBuffer[INITIAL_CAPACITY];
} else {
// null out the nio buffers array so the can be GC'ed
// https://github.com/netty/netty/issues/1763
Arrays.fill(nioBuffers, null);
}
super.recycle();
}
@Override
protected NioEntry newEntry() {
return new NioEntry();
}
protected static final class NioEntry extends Entry {
ByteBuffer[] buffers;
ByteBuffer buf;
int count = -1;
@Override
public void clear() {
buffers = null;
buf = null;
count = -1;
super.clear();
}
@Override
public int cancel() {
buffers = null;
buf = null;
count = -1;
return super.cancel();
}
}
}

View File

@ -238,7 +238,7 @@ public final class OioDatagramChannel extends AbstractOioMessageChannel implemen
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
final Object o = in.current(false);
final Object o = in.current();
if (o == null) {
break;
}

View File

@ -13,10 +13,12 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import org.junit.Test;
@ -26,12 +28,12 @@ import java.nio.ByteBuffer;
import static io.netty.buffer.Unpooled.*;
import static org.junit.Assert.*;
public class ChannelOutboundBufferTest {
public class NioSocketChannelOutboundBufferTest {
@Test
public void testEmptyNioBuffers() {
AbstractChannel channel = new EmbeddedChannel();
ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel);
NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel);
assertEquals(0, buffer.nioBufferCount());
ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals(32, buffers.length);
@ -45,7 +47,7 @@ public class ChannelOutboundBufferTest {
@Test
public void testNioBuffersSingleBacked() {
AbstractChannel channel = new EmbeddedChannel();
ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel);
NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel);
assertEquals(0, buffer.nioBufferCount());
ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals(32, buffers.length);
@ -79,7 +81,7 @@ public class ChannelOutboundBufferTest {
@Test
public void testNioBuffersExpand() {
AbstractChannel channel = new EmbeddedChannel();
ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel);
NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel);
ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII));
for (int i = 0; i < 64; i++) {
@ -104,7 +106,7 @@ public class ChannelOutboundBufferTest {
@Test
public void testNioBuffersExpand2() {
AbstractChannel channel = new EmbeddedChannel();
ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel);
NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel);
CompositeByteBuf comp = compositeBuffer(256);
ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII));