diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 6d5258599e..f33cfb3efa 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -192,7 +192,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So in.progress(buf.readableBytes()); in.remove(); } - in.progress(writtenBytes); } else { // Did not write all buffers completely. // Release the fully written buffers and update the indexes of the partially written buffer. diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 491be7f6eb..932dba47ea 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -30,6 +30,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -325,13 +326,13 @@ public final class ChannelOutboundBuffer { } private void removeEntry(Entry e) { - if (e == tailEntry) { - // processed everything - tailEntry = null; - unflushedEntry = null; - } if (-- flushed == 0) { + // processed everything flushedEntry = null; + if (e == tailEntry) { + tailEntry = null; + unflushedEntry = null; + } } else { flushedEntry = e.next; } @@ -352,7 +353,8 @@ public final class ChannelOutboundBuffer { long nioBufferSize = 0; int nioBufferCount = 0; final ByteBufAllocator alloc = channel.alloc(); - ByteBuffer[] nioBuffers = NIO_BUFFERS.get(); + final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); + ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); Entry entry = flushedEntry; while (entry != null && entry.msg instanceof ByteBuf) { if (!entry.cancelled) { @@ -369,7 +371,8 @@ public final class ChannelOutboundBuffer { } int neededSpace = nioBufferCount + count; if (neededSpace > nioBuffers.length) { - break; + nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); + NIO_BUFFERS.set(threadLocalMap, nioBuffers); } if (buf.isDirect() || threadLocalDirectBufferSize <= 0) { if (count == 1) { @@ -431,6 +434,25 @@ public final class ChannelOutboundBuffer { return nioBufferCount; } + private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { + int newCapacity = array.length; + do { + // double capacity until it is big enough + // See https://github.com/netty/netty/issues/1890 + newCapacity <<= 1; + + if (newCapacity < 0) { + throw new IllegalStateException(); + } + + } while (neededSpace > newCapacity); + + ByteBuffer[] newArray = new ByteBuffer[newCapacity]; + System.arraycopy(array, 0, newArray, 0, size); + + return newArray; + } + public int nioBufferCount() { return nioBufferCount; }