diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index e2b504f5cb..c3418faddd 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -43,6 +43,9 @@ jfieldID fileChannelFieldId = NULL; jfieldID transferedFieldId = NULL; jfieldID fdFieldId = NULL; jfieldID fileDescriptorFieldId = NULL; +jfieldID readerIndexFieldId = NULL; +jfieldID writerIndexFieldId = NULL; +jfieldID memoryAddressFieldId = NULL; jmethodID inetSocketAddrMethodId = NULL; jclass runtimeExceptionClass = NULL; jclass ioExceptionClass = NULL; @@ -322,6 +325,27 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { return JNI_ERR; } socketType = socket_type(); + + jclass addressEntryClass = (*env)->FindClass(env, "io/netty/channel/epoll/EpollChannelOutboundBuffer$AddressEntry"); + if (addressEntryClass == NULL) { + // pending exception... + return JNI_ERR; + } + readerIndexFieldId = (*env)->GetFieldID(env, addressEntryClass, "readerIndex", "I"); + if (readerIndexFieldId == NULL) { + // pending exception... + return JNI_ERR; + } + writerIndexFieldId = (*env)->GetFieldID(env, addressEntryClass, "writerIndex", "I"); + if (writerIndexFieldId == NULL) { + // pending exception... + return JNI_ERR; + } + memoryAddressFieldId = (*env)->GetFieldID(env, addressEntryClass, "memoryAddress", "J"); + if (memoryAddressFieldId == NULL) { + // pending exception... + return JNI_ERR; + } return JNI_VERSION_1_6; } } @@ -510,6 +534,29 @@ void incrementPosition(JNIEnv * env, jobject bufObj, int written) { } } +jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec iov[], jint length) { + ssize_t res; + int err; + do { + res = writev(fd, iov, length); + // keep on writing if it was interrupted + } while(res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + if (err == EAGAIN || err == EWOULDBLOCK) { + // network stack is saturated we will try again later + return 0; + } + if (err == EBADF) { + throwClosedChannelException(env); + return -1; + } + throwIOException(env, exceptionMessage("Error while writev(...): ", err)); + return -1; + } + return (jlong) res; +} + JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) { struct iovec iov[length]; int i; @@ -541,32 +588,15 @@ JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, iov[iovidx].iov_len = (size_t) (limit - pos); iovidx++; } - - ssize_t res; - int err; - do { - res = writev(fd, iov, length); - // keep on writing if it was interrupted - } while(res == -1 && ((err = errno) == EINTR)); - - if (res < 0) { - if (err == EAGAIN || err == EWOULDBLOCK) { - // network stack is saturated we will try again later - return 0; - } - if (err == EBADF) { - throwClosedChannelException(env); - return -1; - } - throwIOException(env, exceptionMessage("Error while writev(...): ", err)); - return -1; + jlong res = writev0(env, clazz, fd, iov, length); + if (res <= 0) { + return res; } // update the position of the written buffers int written = res; int a; for (a = 0; a < length; a++) { - int pos; int len = iov[a].iov_len; jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, a + offset); if (len >= written) { @@ -580,6 +610,27 @@ JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, return res; } +JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length) { + struct iovec iov[length]; + int i; + int iovidx = 0; + for (i = offset; i < length; i++) { + jobject addressEntry = (*env)->GetObjectArrayElement(env, addresses, i); + jint readerIndex = (*env)->GetIntField(env, addressEntry, readerIndexFieldId); + jint writerIndex = (*env)->GetIntField(env, addressEntry, writerIndexFieldId); + void* memoryAddress = (void*) (*env)->GetLongField(env, addressEntry, memoryAddressFieldId); + + iov[iovidx].iov_base = memoryAddress + readerIndex; + iov[iovidx].iov_len = (size_t) (writerIndex - readerIndex); + iovidx++; + } + + jlong res = writev0(env, clazz, fd, iov, length); + if (res <= 0) { + return res; + } +} + jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) { ssize_t res; int err; diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index 2d76071760..76753cca2b 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -32,6 +32,8 @@ void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz, jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); jint Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit); jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length); +jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length); + jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit); void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOutboundBuffer.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOutboundBuffer.java new file mode 100644 index 0000000000..23df74cf1c --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOutboundBuffer.java @@ -0,0 +1,202 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.util.Recycler; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * Special {@link ChannelOutboundBuffer} implementation which allows to obtain an array of {@link AddressEntry} + * and so doing gathering writes without the need to create a {@link ByteBuffer} internally. This reduce + * GC pressure a lot. + */ +final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer { + private AddressEntry[] addresses; + private int addressCount; + private long addressSize; + private static final Recycler RECYCLER = new Recycler() { + @Override + protected EpollChannelOutboundBuffer newObject(Handle handle) { + return new EpollChannelOutboundBuffer(handle); + } + }; + + /** + * Get a new instance of this {@link EpollChannelOutboundBuffer} and attach it the given {@link EpollSocketChannel} + */ + static EpollChannelOutboundBuffer newInstance(EpollSocketChannel channel) { + EpollChannelOutboundBuffer buffer = RECYCLER.get(); + buffer.channel = channel; + return buffer; + } + + private EpollChannelOutboundBuffer(Recycler.Handle handle) { + super(handle); + addresses = new AddressEntry[INITIAL_CAPACITY]; + } + + /** + * Check if the message is a {@link ByteBuf} and if so if it has a memoryAddress. If not it will convert this + * {@link ByteBuf} to be able to operate on the memoryAddress directly for maximal performance. + */ + @Override + protected Object beforeAdd(Object msg) { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (!buf.hasMemoryAddress()) { + ByteBuf direct = copyToDirectByteBuf(buf); + return direct; + } + } + return msg; + } + + /** + * Returns an array of {@link AddressEntry}'s if the currently pending messages are made of {@link ByteBuf} only. + * {@code null} is returned otherwise. If this method returns a non-null array, {@link #addressCount()} and + * {@link #addressSize()} ()} will return the number of {@link AddressEntry}'s in the returned array and the total + * number of readable bytes of the NIO buffers respectively. + *

+ * Note that the returned array is reused and thus should not escape + * {@link io.netty.channel.AbstractChannel#doWrite(ChannelOutboundBuffer)}. + * Refer to {@link EpollSocketChannel#doWrite(ChannelOutboundBuffer)} for an example. + *

+ */ + AddressEntry[] memoryAddresses() { + long addressSize = 0; + int addressCount = 0; + final Entry[] buffer = entries(); + final int mask = buffer.length - 1; + AddressEntry[] addresses = this.addresses; + Object m; + int unflushed = unflushed(); + int flushed = flushed(); + while (flushed != unflushed && (m = buffer[flushed].msg()) != null) { + if (!(m instanceof ByteBuf)) { + this.addressCount = 0; + this.addressSize = 0; + return null; + } + + AddressEntry entry = (AddressEntry) buffer[flushed]; + + // Check if the entry was cancelled. if so we just skip it. + if (!entry.isCancelled()) { + ByteBuf buf = (ByteBuf) m; + final int readerIndex = buf.readerIndex(); + final int readableBytes = buf.writerIndex() - readerIndex; + + if (readableBytes > 0) { + addressSize += readableBytes; + // See if there is enough space to at least store one more entry. + int neededSpace = addressCount + 1; + if (neededSpace > addresses.length) { + this.addresses = addresses = + expandAddressesArray(addresses, neededSpace, addressCount); + } + entry.memoryAddress = buf.memoryAddress(); + entry.readerIndex = buf.readerIndex(); + entry.writerIndex = buf.writerIndex(); + + addresses[addressCount ++] = entry; + } + } + + flushed = flushed + 1 & mask; + } + this.addressCount = addressCount; + this.addressSize = addressSize; + + return addresses; + } + + private static AddressEntry[] expandAddressesArray(AddressEntry[] array, int neededSpace, int size) { + int newCapacity = array.length; + do { + // double capacity until it is big enough + // See https://github.com/netty/netty/issues/1890 + newCapacity <<= 1; + + if (newCapacity < 0) { + throw new IllegalStateException(); + } + + } while (neededSpace > newCapacity); + + AddressEntry[] newArray = new AddressEntry[newCapacity]; + System.arraycopy(array, 0, newArray, 0, size); + + return newArray; + } + + /** + * Return the number of {@link AddressEntry}'s which can be written. + */ + int addressCount() { + return addressCount; + } + + /** + * Return the number of bytes that can be written via gathering writes. + */ + long addressSize() { + return addressSize; + } + + @Override + public void recycle() { + if (addresses.length > INITIAL_CAPACITY) { + addresses = new AddressEntry[INITIAL_CAPACITY]; + } else { + // null out the nio buffers array so the can be GC'ed + // https://github.com/netty/netty/issues/1763 + Arrays.fill(addresses, null); + } + super.recycle(); + } + + @Override + protected AddressEntry newEntry() { + return new AddressEntry(); + } + + static final class AddressEntry extends Entry { + // These fields will be accessed via JNI directly so be carefully when touch them! + long memoryAddress; + int readerIndex; + int writerIndex; + + @Override + public void clear() { + memoryAddress = -1; + readerIndex = 0; + writerIndex = 0; + super.clear(); + } + + @Override + public int cancel() { + memoryAddress = -1; + readerIndex = 0; + writerIndex = 0; + return super.cancel(); + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 4493c21af0..988b09bff1 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -29,9 +29,12 @@ import io.netty.channel.ConnectTimeoutException; import io.netty.channel.DefaultFileRegion; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.epoll.EpollChannelOutboundBuffer.AddressEntry; import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannelOutboundBuffer; +import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import java.io.IOException; @@ -133,10 +136,49 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } private void writeBytesMultiple( - ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers) throws IOException { + EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] nioBuffers) throws IOException { + + int nioBufferCnt = in.addressCount(); + long expectedWrittenBytes = in.addressSize(); + + long localWrittenBytes = Native.writevAddresses(fd, nioBuffers, 0, nioBufferCnt); + + if (localWrittenBytes < expectedWrittenBytes) { + setEpollOut(); + + // Did not write all buffers completely. + // Release the fully written buffers and update the indexes of the partially written buffer. + for (int i = msgCount; i > 0; i --) { + final ByteBuf buf = (ByteBuf) in.current(); + final int readerIndex = buf.readerIndex(); + final int readableBytes = buf.writerIndex() - readerIndex; + + if (readableBytes < localWrittenBytes) { + in.remove(); + localWrittenBytes -= readableBytes; + } else if (readableBytes > localWrittenBytes) { + + buf.readerIndex(readerIndex + (int) localWrittenBytes); + in.progress(localWrittenBytes); + break; + } else { // readable == writtenBytes + in.remove(); + break; + } + } + } else { + // Release all buffers + for (int i = msgCount; i > 0; i --) { + in.remove(); + } + } + } + + private void writeBytesMultiple( + NioSocketChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers) throws IOException { int nioBufferCnt = in.nioBufferCount(); - long expectedWrittenBytes = in.nioBufferSize(); + long expectedWrittenBytes = in.nioBufferCount(); long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt); @@ -196,15 +238,31 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So // * the outbound buffer contains more than one messages and // * they are all buffers rather than a file region. if (msgCount > 1) { - // Ensure the pending writes are made of ByteBufs only. - ByteBuffer[] nioBuffers = in.nioBuffers(); - if (nioBuffers != null) { - writeBytesMultiple(in, msgCount, nioBuffers); + if (PlatformDependent.hasUnsafe()) { + // this means we can cast to EpollChannelOutboundBuffer and write the AdressEntry directly. + EpollChannelOutboundBuffer epollIn = (EpollChannelOutboundBuffer) in; + // Ensure the pending writes are made of memoryaddresses only. + AddressEntry[] addresses = epollIn.memoryAddresses(); + if (addresses != null) { + writeBytesMultiple(epollIn, msgCount, addresses); - // We do not break the loop here even if the outbound buffer was flushed completely, - // because a user might have triggered another write and flush when we notify his or her - // listeners. - continue; + // We do not break the loop here even if the outbound buffer was flushed completely, + // because a user might have triggered another write and flush when we notify his or her + // listeners. + continue; + } + } else { + NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in; + // Ensure the pending writes are made of memoryaddresses only. + ByteBuffer[] buffers = nioIn.nioBuffers(); + if (buffers != null) { + writeBytesMultiple(nioIn, msgCount, buffers); + + // We do not break the loop here even if the outbound buffer was flushed completely, + // because a user might have triggered another write and flush when we notify his or her + // listeners. + continue; + } } } @@ -300,24 +358,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So final class EpollSocketUnsafe extends AbstractEpollUnsafe { private RecvByteBufAllocator.Handle allocHandle; - @Override - public void write(Object msg, ChannelPromise promise) { - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - if (!buf.isDirect()) { - // We can only handle direct buffers so we need to copy if a non direct is - // passed to write. - int readable = buf.readableBytes(); - ByteBuf dst = alloc().directBuffer(readable); - dst.writeBytes(buf, buf.readerIndex(), readable); - - buf.release(); - msg = dst; - } - } - super.write(msg, promise); - } - private void closeOnRead(ChannelPipeline pipeline) { inputShutdown = true; if (isOpen()) { @@ -609,4 +649,16 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } } } + + @Override + protected ChannelOutboundBuffer newOutboundBuffer() { + if (PlatformDependent.hasUnsafe()) { + // This means we will be able to access the memory addresses directly and so be able to do + // gathering writes with the AddressEntry. + return EpollChannelOutboundBuffer.newInstance(this); + } else { + // No access to the memoryAddres, so fallback to use ByteBuffer[] for gathering writes. + return NioSocketChannelOutboundBuffer.newInstance(this); + } + } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index e255149d75..8633743b33 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -17,6 +17,7 @@ package io.netty.channel.epoll; import io.netty.channel.DefaultFileRegion; +import io.netty.channel.epoll.EpollChannelOutboundBuffer.AddressEntry; import io.netty.util.internal.NativeLibraryLoader; import java.io.IOException; @@ -65,6 +66,9 @@ final class Native { public static native int writeAddress(int fd, long address, int pos, int limit) throws IOException; public static native long writev(int fd, ByteBuffer[] buffers, int offset, int length) throws IOException; + public static native long writevAddresses(int fd, AddressEntry[] addresses, int offset, int length) + throws IOException; + public static native int read(int fd, ByteBuffer buf, int pos, int limit) throws IOException; public static native int readAddress(int fd, long address, int pos, int limit) throws IOException; diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index b0ef61007f..80d4b7e6e9 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -20,7 +20,7 @@ import com.sun.nio.sctp.MessageInfo; import com.sun.nio.sctp.NotificationHandler; import com.sun.nio.sctp.SctpChannel; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; @@ -34,6 +34,7 @@ import io.netty.channel.sctp.SctpChannelConfig; import io.netty.channel.sctp.SctpMessage; import io.netty.channel.sctp.SctpNotificationHandler; import io.netty.channel.sctp.SctpServerChannel; +import io.netty.util.Recycler; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -302,20 +303,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett return true; } - ByteBufAllocator alloc = alloc(); - boolean needsCopy = data.nioBufferCount() != 1; - if (!needsCopy) { - if (!data.isDirect() && alloc.isDirectBufferPooled()) { - needsCopy = true; - } - } - ByteBuffer nioData; - if (!needsCopy) { - nioData = data.nioBuffer(); - } else { - data = alloc.directBuffer(dataLen).writeBytes(data); - nioData = data.nioBuffer(); - } + ByteBuffer nioData = data.nioBuffer(); final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier()); @@ -324,13 +312,6 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett final int writtenBytes = javaChannel().send(nioData, mi); boolean done = writtenBytes > 0; - if (needsCopy) { - if (!done) { - in.current(new SctpMessage(mi, data)); - } else { - in.current(data); - } - } return done; } @@ -383,4 +364,42 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett } return promise; } + + @Override + protected ChannelOutboundBuffer newOutboundBuffer() { + return NioSctpChannelOutboundBuffer.newInstance(this); + } + + private static final class NioSctpChannelOutboundBuffer extends ChannelOutboundBuffer { + private static final Recycler RECYCLER = + new Recycler() { + @Override + protected NioSctpChannelOutboundBuffer newObject(Handle handle) { + return new NioSctpChannelOutboundBuffer(handle); + } + }; + + static NioSctpChannelOutboundBuffer newInstance(AbstractChannel channel) { + NioSctpChannelOutboundBuffer buffer = RECYCLER.get(); + buffer.channel = channel; + return buffer; + } + + private NioSctpChannelOutboundBuffer(Recycler.Handle handle) { + super(handle); + } + + @Override + protected Object beforeAdd(Object msg) { + if (msg instanceof SctpMessage) { + SctpMessage message = (SctpMessage) msg; + ByteBuf content = message.content(); + if (!content.isDirect() || content.nioBufferCount() != 1) { + ByteBuf direct = copyToDirectByteBuf(content); + return new SctpMessage(message.protocolIdentifier(), message.streamIdentifier(), direct); + } + } + return msg; + } + } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index b185c88612..f153a652c8 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -382,7 +382,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha */ protected abstract class AbstractUnsafe implements Unsafe { - private ChannelOutboundBuffer outboundBuffer = ChannelOutboundBuffer.newInstance(AbstractChannel.this); + private ChannelOutboundBuffer outboundBuffer = newOutboundBuffer(); + private boolean inFlush0; @Override @@ -769,6 +770,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } + /** + * Create a new {@link ChannelOutboundBuffer} which holds the pending messages for this {@link AbstractChannel}. + */ + protected ChannelOutboundBuffer newOutboundBuffer() { + return ChannelOutboundBuffer.newInstance(this); + } + /** * Return {@code true} if the given {@link EventLoop} is compatible with this instance. */ diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 8228987e48..6fb1315ab6 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -22,21 +22,15 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufHolder; -import io.netty.buffer.UnpooledByteBufAllocator; -import io.netty.buffer.UnpooledDirectByteBuf; import io.netty.buffer.Unpooled; -import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.util.Arrays; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -44,18 +38,11 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending * outbound write requests. */ -public final class ChannelOutboundBuffer { +public class ChannelOutboundBuffer { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class); - private static final int INITIAL_CAPACITY = 32; - - private static final int threadLocalDirectBufferSize; - - static { - threadLocalDirectBufferSize = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024); - logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize); - } + protected static final int INITIAL_CAPACITY = 32; private static final Recycler RECYCLER = new Recycler() { @Override @@ -64,17 +51,18 @@ public final class ChannelOutboundBuffer { } }; + /** + * Get a new instance of this {@link ChannelOutboundBuffer} and attach it the given {@link AbstractChannel} + */ static ChannelOutboundBuffer newInstance(AbstractChannel channel) { ChannelOutboundBuffer buffer = RECYCLER.get(); buffer.channel = channel; - buffer.totalPendingSize = 0; - buffer.writable = 1; return buffer; } - private final Handle handle; + private final Handle handle; - private AbstractChannel channel; + protected AbstractChannel channel; // A circular buffer used to store messages. The buffer is arranged such that: flushed <= unflushed <= tail. The // flushed messages are stored in the range [flushed, unflushed). Unflushed messages are stored in the range @@ -84,10 +72,6 @@ public final class ChannelOutboundBuffer { private int unflushed; private int tail; - private ByteBuffer[] nioBuffers; - private int nioBufferCount; - private long nioBufferSize; - private boolean inFail; private static final AtomicLongFieldUpdater TOTAL_PENDING_SIZE_UPDATER; @@ -114,18 +98,29 @@ public final class ChannelOutboundBuffer { private volatile int writable = 1; - private ChannelOutboundBuffer(Handle handle) { + protected ChannelOutboundBuffer(Handle handle) { this.handle = handle; buffer = new Entry[INITIAL_CAPACITY]; for (int i = 0; i < buffer.length; i++) { - buffer[i] = new Entry(); + buffer[i] = newEntry(); } - - nioBuffers = new ByteBuffer[INITIAL_CAPACITY]; } - void addMessage(Object msg, ChannelPromise promise) { + /** + * Return the array of {@link Entry}'s which hold the pending write requests in an circular array. + */ + protected final Entry[] entries() { + return buffer; + } + + /** + * Add the given message to this {@link ChannelOutboundBuffer} so it will be marked as flushed once + * {@link #addFlush()} was called. The {@link ChannelPromise} will be notified once the write operations + * completes. + */ + public final void addMessage(Object msg, ChannelPromise promise) { + msg = beforeAdd(msg); int size = channel.estimatorHandle().size(msg); if (size < 0) { size = 0; @@ -148,6 +143,17 @@ public final class ChannelOutboundBuffer { incrementPendingOutboundBytes(size); } + /** + * Is called before the message is actually added to the {@link ChannelOutboundBuffer} and so allow to + * convert it to a different format. Sub-classes may override this. + */ + protected Object beforeAdd(Object msg) { + return msg; + } + + /** + * Expand internal array which holds the {@link Entry}'s. + */ private void addCapacity() { int p = flushed; int n = buffer.length; @@ -163,7 +169,7 @@ public final class ChannelOutboundBuffer { System.arraycopy(buffer, p, e, 0, r); System.arraycopy(buffer, 0, e, r, p); for (int i = n; i < e.length; i++) { - e[i] = new Entry(); + e[i] = newEntry(); } buffer = e; @@ -172,7 +178,10 @@ public final class ChannelOutboundBuffer { tail = n; } - void addFlush() { + /** + * Mark all messages in this {@link ChannelOutboundBuffer} as flushed. + */ + public final void addFlush() { unflushed = tail; final int mask = buffer.length - 1; @@ -192,7 +201,7 @@ public final class ChannelOutboundBuffer { * Increment the pending bytes which will be written at some point. * This method is thread-safe! */ - void incrementPendingOutboundBytes(int size) { + final void incrementPendingOutboundBytes(int size) { // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets // recycled while process this method. Channel channel = this.channel; @@ -220,7 +229,7 @@ public final class ChannelOutboundBuffer { * Decrement the pending bytes which will be written at some point. * This method is thread-safe! */ - void decrementPendingOutboundBytes(int size) { + final void decrementPendingOutboundBytes(int size) { // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets // recycled while process this method. Channel channel = this.channel; @@ -257,60 +266,20 @@ public final class ChannelOutboundBuffer { return -1; } - public Object current() { - return current(true); - } - - public Object current(boolean preferDirect) { + /** + * Return current message or {@code null} if no flushed message is left to process. + */ + public final Object current() { if (isEmpty()) { return null; } else { // TODO: Think of a smart way to handle ByteBufHolder messages Entry entry = buffer[flushed]; - Object msg = entry.msg; - if (threadLocalDirectBufferSize <= 0 || !preferDirect) { - return msg; - } - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - if (buf.isDirect()) { - return buf; - } else { - int readableBytes = buf.readableBytes(); - if (readableBytes == 0) { - return buf; - } - - // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O. - // We can do a better job by using our pooled allocator. If the current allocator does not - // pool a direct buffer, we use a ThreadLocal based pool. - ByteBufAllocator alloc = channel.alloc(); - ByteBuf directBuf; - if (alloc.isDirectBufferPooled()) { - directBuf = alloc.directBuffer(readableBytes); - } else { - directBuf = ThreadLocalPooledByteBuf.newInstance(); - } - directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); - current(directBuf); - return directBuf; - } - } - return msg; + return entry.msg; } } - /** - * Replace the current msg with the given one. - * The replaced msg will automatically be released - */ - public void current(Object msg) { - Entry entry = buffer[flushed]; - safeRelease(entry.msg); - entry.msg = msg; - } - - public void progress(long amount) { + public final void progress(long amount) { Entry e = buffer[flushed]; ChannelPromise p = e.promise; if (p instanceof ChannelProgressivePromise) { @@ -320,7 +289,11 @@ public final class ChannelOutboundBuffer { } } - public boolean remove() { + /** + * Mark the current message as successful written and remove it from this {@link ChannelOutboundBuffer}. + * This method will return {@code true} if there are more messages left to process, {@code false} otherwise. + */ + public final boolean remove() { if (isEmpty()) { return false; } @@ -348,7 +321,12 @@ public final class ChannelOutboundBuffer { return true; } - public boolean remove(Throwable cause) { + /** + * Mark the current message as failure with the given {@link java.lang.Throwable} and remove it from this + * {@link ChannelOutboundBuffer}. This method will return {@code true} if there are more messages left to process, + * {@code false} otherwise. + */ + public final boolean remove(Throwable cause) { if (isEmpty()) { return false; } @@ -377,152 +355,28 @@ public final class ChannelOutboundBuffer { return true; } - /** - * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only. - * {@code null} is returned otherwise. If this method returns a non-null array, {@link #nioBufferCount()} and - * {@link #nioBufferSize()} will return the number of NIO buffers in the returned array and the total number - * of readable bytes of the NIO buffers respectively. - *

- * Note that the returned array is reused and thus should not escape - * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}. - * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example. - *

- */ - public ByteBuffer[] nioBuffers() { - long nioBufferSize = 0; - int nioBufferCount = 0; - final int mask = buffer.length - 1; - final ByteBufAllocator alloc = channel.alloc(); - ByteBuffer[] nioBuffers = this.nioBuffers; - Object m; - int i = flushed; - while (i != unflushed && (m = buffer[i].msg) != null) { - if (!(m instanceof ByteBuf)) { - this.nioBufferCount = 0; - this.nioBufferSize = 0; - return null; - } - - Entry entry = buffer[i]; - - if (!entry.cancelled) { - ByteBuf buf = (ByteBuf) m; - final int readerIndex = buf.readerIndex(); - final int readableBytes = buf.writerIndex() - readerIndex; - - if (readableBytes > 0) { - nioBufferSize += readableBytes; - int count = entry.count; - if (count == -1) { - //noinspection ConstantValueVariableUse - entry.count = count = buf.nioBufferCount(); - } - int neededSpace = nioBufferCount + count; - if (neededSpace > nioBuffers.length) { - this.nioBuffers = nioBuffers = - expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); - } - if (buf.isDirect() || threadLocalDirectBufferSize <= 0) { - if (count == 1) { - ByteBuffer nioBuf = entry.buf; - if (nioBuf == null) { - // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a - // derived buffer - entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); - } - nioBuffers[nioBufferCount ++] = nioBuf; - } else { - ByteBuffer[] nioBufs = entry.buffers; - if (nioBufs == null) { - // cached ByteBuffers as they may be expensive to create in terms - // of Object allocation - entry.buffers = nioBufs = buf.nioBuffers(); - } - nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); - } - } else { - nioBufferCount = fillBufferArrayNonDirect(entry, buf, readerIndex, - readableBytes, alloc, nioBuffers, nioBufferCount); - } - } - } - - i = i + 1 & mask; - } - this.nioBufferCount = nioBufferCount; - this.nioBufferSize = nioBufferSize; - - return nioBuffers; - } - - private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) { - for (ByteBuffer nioBuf: nioBufs) { - if (nioBuf == null) { - break; - } - nioBuffers[nioBufferCount ++] = nioBuf; - } - return nioBufferCount; - } - - private static int fillBufferArrayNonDirect(Entry entry, ByteBuf buf, int readerIndex, int readableBytes, - ByteBufAllocator alloc, ByteBuffer[] nioBuffers, int nioBufferCount) { - ByteBuf directBuf; - if (alloc.isDirectBufferPooled()) { - directBuf = alloc.directBuffer(readableBytes); - } else { - directBuf = ThreadLocalPooledByteBuf.newInstance(); - } - directBuf.writeBytes(buf, readerIndex, readableBytes); - buf.release(); - entry.msg = directBuf; - // cache ByteBuffer - ByteBuffer nioBuf = entry.buf = directBuf.internalNioBuffer(0, readableBytes); - entry.count = 1; - nioBuffers[nioBufferCount ++] = nioBuf; - return nioBufferCount; - } - - private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { - int newCapacity = array.length; - do { - // double capacity until it is big enough - // See https://github.com/netty/netty/issues/1890 - newCapacity <<= 1; - - if (newCapacity < 0) { - throw new IllegalStateException(); - } - - } while (neededSpace > newCapacity); - - ByteBuffer[] newArray = new ByteBuffer[newCapacity]; - System.arraycopy(array, 0, newArray, 0, size); - - return newArray; - } - - public int nioBufferCount() { - return nioBufferCount; - } - - public long nioBufferSize() { - return nioBufferSize; - } - - boolean getWritable() { + final boolean getWritable() { return writable != 0; } - public int size() { + /** + * Return the number of messages that are ready to be written (flushed before). + */ + public final int size() { return unflushed - flushed & buffer.length - 1; } - public boolean isEmpty() { + /** + * Return {@code true} if this {@link ChannelOutboundBuffer} contains no flushed messages + */ + public final boolean isEmpty() { return unflushed == flushed; } - void failFlushed(Throwable cause) { + /** + * Fail all previous flushed messages with the given {@link Throwable}. + */ + final void failFlushed(Throwable cause) { // Make sure that this method does not reenter. A listener added to the current promise can be notified by the // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call // indirectly (usually by closing the channel.) @@ -544,7 +398,10 @@ public final class ChannelOutboundBuffer { } } - void close(final ClosedChannelException cause) { + /** + * Fail all pending messages with the given {@link ClosedChannelException}. + */ + final void close(final ClosedChannelException cause) { if (inFail) { channel.eventLoop().execute(new Runnable() { @Override @@ -596,7 +453,10 @@ public final class ChannelOutboundBuffer { recycle(); } - private static void safeRelease(Object message) { + /** + * Release the message and log if any error happens during release. + */ + protected static void safeRelease(Object message) { try { ReferenceCountUtil.release(message); } catch (Throwable t) { @@ -604,18 +464,29 @@ public final class ChannelOutboundBuffer { } } + /** + * Try to mark the given {@link ChannelPromise} as success and log if this failed. + */ private static void safeSuccess(ChannelPromise promise) { if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) { logger.warn("Failed to mark a promise as success because it is done already: {}", promise); } } + /** + * Try to mark the given {@link ChannelPromise} as failued with the given {@link Throwable} and log if this failed. + */ private static void safeFail(ChannelPromise promise, Throwable cause) { if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); } } + /** + * Recycle this {@link ChannelOutboundBuffer}. After this was called it is disallowed to use it with the previous + * assigned {@link AbstractChannel}. + */ + @SuppressWarnings("unchecked") public void recycle() { if (buffer.length > INITIAL_CAPACITY) { Entry[] e = new Entry[INITIAL_CAPACITY]; @@ -623,14 +494,6 @@ public final class ChannelOutboundBuffer { buffer = e; } - if (nioBuffers.length > INITIAL_CAPACITY) { - nioBuffers = new ByteBuffer[INITIAL_CAPACITY]; - } else { - // null out the nio buffers array so the can be GC'ed - // https://github.com/netty/netty/issues/1763 - Arrays.fill(nioBuffers, null); - } - // reset flushed, unflushed and tail // See https://github.com/netty/netty/issues/1772 flushed = 0; @@ -640,17 +503,61 @@ public final class ChannelOutboundBuffer { // Set the channel to null so it can be GC'ed ASAP channel = null; - RECYCLER.recycle(this, handle); + totalPendingSize = 0; + writable = 1; + + RECYCLER.recycle(this, (Handle) handle); } - public long totalPendingWriteBytes() { + /** + * Return the total number of pending bytes. + */ + public final long totalPendingWriteBytes() { return totalPendingSize; } - private static final class Entry { + /** + * Create a new {@link Entry} to use for the internal datastructure. Sub-classes may override this use a special + * sub-class. + */ + protected Entry newEntry() { + return new Entry(); + } + + /** + * Return the index of the first flushed message. + */ + protected final int flushed() { + return flushed; + } + + /** + * Return the index of the first unflushed messages. + */ + protected final int unflushed() { + return unflushed; + } + + protected ByteBuf copyToDirectByteBuf(ByteBuf buf) { + int readableBytes = buf.readableBytes(); + ByteBufAllocator alloc = channel.alloc(); + if (alloc.isDirectBufferPooled()) { + ByteBuf directBuf = alloc.directBuffer(readableBytes); + directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); + safeRelease(buf); + return directBuf; + } + if (ThreadLocalPooledDirectByteBuf.threadLocalDirectBufferSize > 0) { + ByteBuf directBuf = ThreadLocalPooledDirectByteBuf.newInstance(); + directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); + safeRelease(buf); + return directBuf; + } + return buf; + } + + protected static class Entry { Object msg; - ByteBuffer[] buffers; - ByteBuffer buf; ChannelPromise promise; long progress; long total; @@ -658,6 +565,22 @@ public final class ChannelOutboundBuffer { int count = -1; boolean cancelled; + public Object msg() { + return msg; + } + + /** + * Return {@code true} if the {@link Entry} was cancelled via {@link #cancel()} before, + * {@code false} otherwise. + */ + public boolean isCancelled() { + return cancelled; + } + + /** + * Cancel this {@link Entry} and the message that was hold by this {@link Entry}. This method returns the + * number of pending bytes for the cancelled message. + */ public int cancel() { if (!cancelled) { cancelled = true; @@ -670,16 +593,15 @@ public final class ChannelOutboundBuffer { pendingSize = 0; total = 0; progress = 0; - buffers = null; - buf = null; return pSize; } return 0; } + /** + * Clear this {@link Entry} and so release all resources. + */ public void clear() { - buffers = null; - buf = null; msg = null; promise = null; progress = 0; @@ -689,36 +611,4 @@ public final class ChannelOutboundBuffer { cancelled = false; } } - - static final class ThreadLocalPooledByteBuf extends UnpooledDirectByteBuf { - private final Recycler.Handle handle; - - private static final Recycler RECYCLER = new Recycler() { - @Override - protected ThreadLocalPooledByteBuf newObject(Handle handle) { - return new ThreadLocalPooledByteBuf(handle); - } - }; - - private ThreadLocalPooledByteBuf(Recycler.Handle handle) { - super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE); - this.handle = handle; - } - - static ThreadLocalPooledByteBuf newInstance() { - ThreadLocalPooledByteBuf buf = RECYCLER.get(); - buf.setRefCnt(1); - return buf; - } - - @Override - protected void deallocate() { - if (capacity() > threadLocalDirectBufferSize) { - super.deallocate(); - } else { - clear(); - RECYCLER.recycle(this, handle); - } - } - } } diff --git a/transport/src/main/java/io/netty/channel/ThreadLocalPooledDirectByteBuf.java b/transport/src/main/java/io/netty/channel/ThreadLocalPooledDirectByteBuf.java new file mode 100644 index 0000000000..d007625b5f --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ThreadLocalPooledDirectByteBuf.java @@ -0,0 +1,120 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/* + * Written by Josh Bloch of Google Inc. and released to the public domain, + * as explained at http://creativecommons.org/publicdomain/zero/1.0/. + */ +package io.netty.channel; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.UnpooledDirectByteBuf; +import io.netty.buffer.UnpooledUnsafeDirectByteBuf; +import io.netty.util.Recycler; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.SystemPropertyUtil; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +final class ThreadLocalPooledDirectByteBuf { + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(ThreadLocalPooledDirectByteBuf.class); + public static final int threadLocalDirectBufferSize; + + static { + threadLocalDirectBufferSize = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024); + logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize); + } + + public static ByteBuf newInstance() { + if (PlatformDependent.hasUnsafe()) { + return ThreadLocalUnsafeDirectByteBuf.newInstance(); + } else { + return ThreadLocalDirectByteBuf.newInstance(); + } + } + + private ThreadLocalPooledDirectByteBuf() { + // utility + } + + private static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf { + + private static final Recycler RECYCLER = + new Recycler() { + @Override + protected ThreadLocalUnsafeDirectByteBuf newObject(Handle handle) { + return new ThreadLocalUnsafeDirectByteBuf(handle); + } + }; + + static ThreadLocalUnsafeDirectByteBuf newInstance() { + ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get(); + buf.setRefCnt(1); + return buf; + } + + private final Recycler.Handle handle; + + private ThreadLocalUnsafeDirectByteBuf(Recycler.Handle handle) { + super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE); + this.handle = handle; + } + + @Override + protected void deallocate() { + if (capacity() > threadLocalDirectBufferSize) { + super.deallocate(); + } else { + clear(); + RECYCLER.recycle(this, handle); + } + } + } + + private static final class ThreadLocalDirectByteBuf extends UnpooledDirectByteBuf { + + private static final Recycler RECYCLER = new Recycler() { + @Override + protected ThreadLocalDirectByteBuf newObject(Handle handle) { + return new ThreadLocalDirectByteBuf(handle); + } + }; + + static ThreadLocalDirectByteBuf newInstance() { + ThreadLocalDirectByteBuf buf = RECYCLER.get(); + buf.setRefCnt(1); + return buf; + } + + private final Recycler.Handle handle; + + private ThreadLocalDirectByteBuf(Recycler.Handle handle) { + super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE); + this.handle = handle; + } + + @Override + protected void deallocate() { + if (capacity() > threadLocalDirectBufferSize) { + super.deallocate(); + } else { + clear(); + RECYCLER.recycle(this, handle); + } + } + } +} diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 056f2d74cc..1bdec26edb 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -194,17 +194,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { continue; } - if (!buf.isDirect()) { - ByteBufAllocator alloc = alloc(); - if (alloc.isDirectBufferPooled()) { - // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O. - // We can do a better job by using our pooled allocator. If the current allocator does not - // pool a direct buffer, we rely on JDK's direct buffer pool. - buf = alloc.directBuffer(readableBytes).writeBytes(buf); - in.current(buf); - } - } - boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index f88c85f25d..f3773feea1 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -56,6 +56,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { key.interestOps(interestOps & ~readInterestOp); } } + @Override public void read() { assert eventLoop().inEventLoop(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index c2619aba88..296dbd846d 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -16,7 +16,6 @@ package io.netty.channel.socket.nio; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufHolder; import io.netty.channel.AddressedEnvelope; import io.netty.channel.Channel; @@ -26,7 +25,6 @@ import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; -import io.netty.channel.DefaultAddressedEnvelope; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.socket.DatagramChannelConfig; @@ -260,20 +258,7 @@ public final class NioDatagramChannel return true; } - ByteBufAllocator alloc = alloc(); - boolean needsCopy = data.nioBufferCount() != 1; - if (!needsCopy) { - if (!data.isDirect() && alloc.isDirectBufferPooled()) { - needsCopy = true; - } - } - ByteBuffer nioData; - if (!needsCopy) { - nioData = data.nioBuffer(); - } else { - data = alloc.directBuffer(dataLen).writeBytes(data); - nioData = data.nioBuffer(); - } + ByteBuffer nioData = data.nioBuffer(); final int writtenBytes; if (remoteAddress != null) { @@ -283,22 +268,6 @@ public final class NioDatagramChannel } boolean done = writtenBytes > 0; - if (needsCopy) { - // This means we have allocated a new buffer and need to store it back so we not need to allocate it again - // later - if (remoteAddress == null) { - // remoteAddress is null which means we can handle it as ByteBuf directly - in.current(data); - } else { - if (!done) { - // store it back with all the needed informations - in.current(new DefaultAddressedEnvelope(data, remoteAddress)); - } else { - // Just store back the new create buffer so it is cleaned up once in.remove() is called. - in.current(data); - } - } - } return done; } @@ -537,4 +506,9 @@ public final class NioDatagramChannel } return promise; } + + @Override + protected ChannelOutboundBuffer newOutboundBuffer() { + return NioDatagramChannelOutboundBuffer.newInstance(this); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelOutboundBuffer.java new file mode 100644 index 0000000000..b3bd3b676d --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelOutboundBuffer.java @@ -0,0 +1,65 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.Recycler; + +/** + * Special {@link ChannelOutboundBuffer} for {@link NioDatagramChannel} implementations. + */ +final class NioDatagramChannelOutboundBuffer extends ChannelOutboundBuffer { + private static final Recycler RECYCLER = + new Recycler() { + @Override + protected NioDatagramChannelOutboundBuffer newObject(Handle handle) { + return new NioDatagramChannelOutboundBuffer(handle); + } + }; + + /** + * Get a new instance of this {@link NioSocketChannelOutboundBuffer} and attach it the given + * {@link .NioDatagramChannel}. + */ + static NioDatagramChannelOutboundBuffer newInstance(NioDatagramChannel channel) { + NioDatagramChannelOutboundBuffer buffer = RECYCLER.get(); + buffer.channel = channel; + return buffer; + } + + private NioDatagramChannelOutboundBuffer(Recycler.Handle handle) { + super(handle); + } + + /** + * Convert all non direct {@link ByteBuf} to direct {@link ByteBuf}'s. This is done as the JDK implementation + * will do the conversation itself and we can do a better job here. + */ + @Override + protected Object beforeAdd(Object msg) { + if (msg instanceof DatagramPacket) { + DatagramPacket packet = (DatagramPacket) msg; + ByteBuf content = packet.content(); + if (!content.isDirect() || content.nioBufferCount() != 1) { + ByteBuf direct = copyToDirectByteBuf(content); + return new DatagramPacket(direct, packet.recipient(), packet.sender()); + } + } + return msg; + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 665ce2945c..cd5e39373c 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -231,16 +231,16 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty super.doWrite(in); return; } - + NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in; // Ensure the pending writes are made of ByteBufs only. - ByteBuffer[] nioBuffers = in.nioBuffers(); + ByteBuffer[] nioBuffers = nioIn.nioBuffers(); if (nioBuffers == null) { super.doWrite(in); return; } - int nioBufferCnt = in.nioBufferCount(); - long expectedWrittenBytes = in.nioBufferSize(); + int nioBufferCnt = nioIn.nioBufferCount(); + long expectedWrittenBytes = nioIn.nioBufferSize(); final SocketChannel ch = javaChannel(); long writtenBytes = 0; @@ -263,7 +263,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty if (done) { // Release all buffers for (int i = msgCount; i > 0; i --) { - in.remove(); + nioIn.remove(); } // Finish the write loop if no new messages were flushed by in.remove(). @@ -281,16 +281,16 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes < writtenBytes) { - in.progress(readableBytes); - in.remove(); + nioIn.progress(readableBytes); + nioIn.remove(); writtenBytes -= readableBytes; } else if (readableBytes > writtenBytes) { buf.readerIndex(readerIndex + (int) writtenBytes); - in.progress(writtenBytes); + nioIn.progress(writtenBytes); break; } else { // readableBytes == writtenBytes - in.progress(readableBytes); - in.remove(); + nioIn.progress(readableBytes); + nioIn.remove(); break; } } @@ -300,4 +300,9 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty } } } + + @Override + protected ChannelOutboundBuffer newOutboundBuffer() { + return NioSocketChannelOutboundBuffer.newInstance(this); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBuffer.java new file mode 100644 index 0000000000..767e1c2075 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBuffer.java @@ -0,0 +1,233 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/* + * Written by Josh Bloch of Google Inc. and released to the public domain, + * as explained at http://creativecommons.org/publicdomain/zero/1.0/. + */ +package io.netty.channel.socket.nio; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.AbstractChannel; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.util.Recycler; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * Special {@link ChannelOutboundBuffer} implementation which allows to also access flushed {@link ByteBuffer} to + * allow efficent gathering writes. + */ +public final class NioSocketChannelOutboundBuffer extends ChannelOutboundBuffer { + + private ByteBuffer[] nioBuffers; + private int nioBufferCount; + private long nioBufferSize; + + private static final Recycler RECYCLER = + new Recycler() { + @Override + protected NioSocketChannelOutboundBuffer newObject(Handle handle) { + return new NioSocketChannelOutboundBuffer(handle); + } + }; + + /** + * Get a new instance of this {@link NioSocketChannelOutboundBuffer} and attach it the given {@link AbstractChannel} + */ + public static NioSocketChannelOutboundBuffer newInstance(AbstractChannel channel) { + NioSocketChannelOutboundBuffer buffer = RECYCLER.get(); + buffer.channel = channel; + return buffer; + } + + private NioSocketChannelOutboundBuffer(Recycler.Handle handle) { + super(handle); + nioBuffers = new ByteBuffer[INITIAL_CAPACITY]; + } + + /** + * Convert all non direct {@link ByteBuf} to direct {@link ByteBuf}'s. This is done as the JDK implementation + * will do the conversation itself and we can do a better job here. + */ + @Override + protected Object beforeAdd(Object msg) { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (!buf.isDirect()) { + return copyToDirectByteBuf(buf); + } + } + return msg; + } + + /** + * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only. + * {@code null} is returned otherwise. If this method returns a non-null array, {@link #nioBufferCount()} and + * {@link #nioBufferSize()} will return the number of NIO buffers in the returned array and the total number + * of readable bytes of the NIO buffers respectively. + *

+ * Note that the returned array is reused and thus should not escape + * {@link io.netty.channel.AbstractChannel#doWrite(ChannelOutboundBuffer)}. + * Refer to {@link io.netty.channel.socket.nio.NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example. + *

+ */ + public ByteBuffer[] nioBuffers() { + long nioBufferSize = 0; + int nioBufferCount = 0; + final Entry[] buffer = entries(); + final int mask = buffer.length - 1; + ByteBuffer[] nioBuffers = this.nioBuffers; + Object m; + int unflushed = unflushed(); + int i = flushed(); + while (i != unflushed && (m = buffer[i].msg()) != null) { + if (!(m instanceof ByteBuf)) { + this.nioBufferCount = 0; + this.nioBufferSize = 0; + return null; + } + + NioEntry entry = (NioEntry) buffer[i]; + + if (!entry.isCancelled()) { + ByteBuf buf = (ByteBuf) m; + final int readerIndex = buf.readerIndex(); + final int readableBytes = buf.writerIndex() - readerIndex; + + if (readableBytes > 0) { + nioBufferSize += readableBytes; + int count = entry.count; + if (count == -1) { + //noinspection ConstantValueVariableUse + entry.count = count = buf.nioBufferCount(); + } + int neededSpace = nioBufferCount + count; + if (neededSpace > nioBuffers.length) { + this.nioBuffers = nioBuffers = + expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); + } + if (count == 1) { + ByteBuffer nioBuf = entry.buf; + if (nioBuf == null) { + // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a + // derived buffer + entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); + } + nioBuffers[nioBufferCount ++] = nioBuf; + } else { + ByteBuffer[] nioBufs = entry.buffers; + if (nioBufs == null) { + // cached ByteBuffers as they may be expensive to create in terms + // of Object allocation + entry.buffers = nioBufs = buf.nioBuffers(); + } + nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); + } + } + } + + i = i + 1 & mask; + } + this.nioBufferCount = nioBufferCount; + this.nioBufferSize = nioBufferSize; + + return nioBuffers; + } + + private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) { + for (ByteBuffer nioBuf: nioBufs) { + if (nioBuf == null) { + break; + } + nioBuffers[nioBufferCount ++] = nioBuf; + } + return nioBufferCount; + } + + private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { + int newCapacity = array.length; + do { + // double capacity until it is big enough + // See https://github.com/netty/netty/issues/1890 + newCapacity <<= 1; + + if (newCapacity < 0) { + throw new IllegalStateException(); + } + + } while (neededSpace > newCapacity); + + ByteBuffer[] newArray = new ByteBuffer[newCapacity]; + System.arraycopy(array, 0, newArray, 0, size); + + return newArray; + } + + /** + * Return the number of {@link java.nio.ByteBuffer} which can be written. + */ + public int nioBufferCount() { + return nioBufferCount; + } + + /** + * Return the number of bytes that can be written via gathering writes. + */ + public long nioBufferSize() { + return nioBufferSize; + } + + @Override + public void recycle() { + // take care of recycle the ByteBuffer[] structure. + if (nioBuffers.length > INITIAL_CAPACITY) { + nioBuffers = new ByteBuffer[INITIAL_CAPACITY]; + } else { + // null out the nio buffers array so the can be GC'ed + // https://github.com/netty/netty/issues/1763 + Arrays.fill(nioBuffers, null); + } + super.recycle(); + } + + @Override + protected NioEntry newEntry() { + return new NioEntry(); + } + + protected static final class NioEntry extends Entry { + ByteBuffer[] buffers; + ByteBuffer buf; + int count = -1; + + @Override + public void clear() { + buffers = null; + buf = null; + count = -1; + super.clear(); + } + + @Override + public int cancel() { + buffers = null; + buf = null; + count = -1; + return super.cancel(); + } + } +} diff --git a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java b/transport/src/test/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBufferTest.java similarity index 89% rename from transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java rename to transport/src/test/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBufferTest.java index e4abbf6b10..7f825d4b0e 100644 --- a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java +++ b/transport/src/test/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBufferTest.java @@ -13,10 +13,12 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.channel; +package io.netty.channel.socket.nio; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.AbstractChannel; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.CharsetUtil; import org.junit.Test; @@ -26,12 +28,12 @@ import java.nio.ByteBuffer; import static io.netty.buffer.Unpooled.*; import static org.junit.Assert.*; -public class ChannelOutboundBufferTest { +public class NioSocketChannelOutboundBufferTest { @Test public void testEmptyNioBuffers() { AbstractChannel channel = new EmbeddedChannel(); - ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); + NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel); assertEquals(0, buffer.nioBufferCount()); ByteBuffer[] buffers = buffer.nioBuffers(); assertEquals(32, buffers.length); @@ -45,7 +47,7 @@ public class ChannelOutboundBufferTest { @Test public void testNioBuffersSingleBacked() { AbstractChannel channel = new EmbeddedChannel(); - ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); + NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel); assertEquals(0, buffer.nioBufferCount()); ByteBuffer[] buffers = buffer.nioBuffers(); assertEquals(32, buffers.length); @@ -79,7 +81,7 @@ public class ChannelOutboundBufferTest { @Test public void testNioBuffersExpand() { AbstractChannel channel = new EmbeddedChannel(); - ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); + NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel); ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII)); for (int i = 0; i < 64; i++) { @@ -104,7 +106,7 @@ public class ChannelOutboundBufferTest { @Test public void testNioBuffersExpand2() { AbstractChannel channel = new EmbeddedChannel(); - ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); + NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel); CompositeByteBuf comp = compositeBuffer(256); ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII));