diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 393aad8246..0dbdccaaa4 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -380,8 +380,9 @@ public final class ChannelOutboundBuffer { if (count == -1) { entry.count = count = buf.nioBufferCount(); } - if (nioBufferCount + count > nioBuffers.length) { - this.nioBuffers = nioBuffers = doubleNioBufferArray(nioBuffers, nioBufferCount); + int neededSpace = nioBufferCount + count; + if (neededSpace > nioBuffers.length) { + this.nioBuffers = nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); } if (buf.isDirect() || threadLocalDirectBufferSize <= 0) { @@ -442,11 +443,18 @@ public final class ChannelOutboundBuffer { return nioBufferCount; } - private static ByteBuffer[] doubleNioBufferArray(ByteBuffer[] array, int size) { - int newCapacity = array.length << 1; - if (newCapacity < 0) { - throw new IllegalStateException(); - } + private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { + int newCapacity = array.length; + do { + // double capacity until it is big enough + // See https://github.com/netty/netty/issues/1890 + newCapacity <<= 1; + + if (newCapacity < 0) { + throw new IllegalStateException(); + } + + } while (neededSpace > newCapacity); ByteBuffer[] newArray = new ByteBuffer[newCapacity]; System.arraycopy(array, 0, newArray, 0, size); diff --git a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java new file mode 100644 index 0000000000..ecd0a0a840 --- /dev/null +++ b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java @@ -0,0 +1,212 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.util.CharsetUtil; +import org.junit.Test; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import static io.netty.buffer.Unpooled.*; +import static org.junit.Assert.*; + +public class ChannelOutboundBufferTest { + + @Test + public void testEmptyNioBuffers() { + TestChannel channel = new TestChannel(); + ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); + assertEquals(0, buffer.nioBufferCount()); + ByteBuffer[] buffers = buffer.nioBuffers(); + assertEquals(32, buffers.length); + for (ByteBuffer b: buffers) { + assertNull(b); + } + assertEquals(0, buffer.nioBufferCount()); + } + + @Test + public void testNioBuffersSingleBacked() { + TestChannel channel = new TestChannel(); + + ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); + assertEquals(0, buffer.nioBufferCount()); + ByteBuffer[] buffers = buffer.nioBuffers(); + assertEquals(32, buffers.length); + for (ByteBuffer b: buffers) { + assertNull(b); + } + assertEquals(0, buffer.nioBufferCount()); + + ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII); + buffer.addMessage(buf, channel.voidPromise()); + buffers = buffer.nioBuffers(); + assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); + for (ByteBuffer b: buffers) { + assertNull(b); + } + buffer.addFlush(); + buffers = buffer.nioBuffers(); + assertEquals(32, buffers.length); + assertEquals("Should still be 0 as not flushed yet", 1, buffer.nioBufferCount()); + for (int i = 0; i < buffers.length; i++) { + if (i == 0) { + assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes())); + } else { + assertNull(buffers[i]); + } + } + } + + @Test + public void testNioBuffersExpand() { + TestChannel channel = new TestChannel(); + + ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); + + ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII); + for (int i = 0; i < 64; i++) { + buffer.addMessage(buf.copy(), channel.voidPromise()); + } + ByteBuffer[] buffers = buffer.nioBuffers(); + assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); + for (ByteBuffer b: buffers) { + assertNull(b); + } + buffer.addFlush(); + buffers = buffer.nioBuffers(); + assertEquals(64, buffers.length); + assertEquals(64, buffer.nioBufferCount()); + for (int i = 0; i < buffers.length; i++) { + assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes())); + } + } + + @Test + public void testNioBuffersExpand2() { + TestChannel channel = new TestChannel(); + + ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); + + CompositeByteBuf comp = compositeBuffer(256); + ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII); + for (int i = 0; i < 65; i++) { + comp.addComponent(buf.copy()).writerIndex(comp.writerIndex() + buf.readableBytes()); + } + System.out.println(comp.nioBufferCount()); + buffer.addMessage(comp, channel.voidPromise()); + + ByteBuffer[] buffers = buffer.nioBuffers(); + assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); + for (ByteBuffer b: buffers) { + assertNull(b); + } + buffer.addFlush(); + buffers = buffer.nioBuffers(); + assertEquals(128, buffers.length); + assertEquals(65, buffer.nioBufferCount()); + for (int i = 0; i < buffers.length; i++) { + if (i < 65) { + assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes())); + } else { + assertNull(buffers[i]); + } + } + } + + private static final class TestChannel extends AbstractChannel { + private final ChannelConfig config = new DefaultChannelConfig(this); + + TestChannel() { + super(null); + } + + @Override + protected AbstractUnsafe newUnsafe() { + return new TestUnsafe(); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return false; + } + + @Override + protected SocketAddress localAddress0() { + throw new UnsupportedOperationException(); + } + + @Override + protected SocketAddress remoteAddress0() { + throw new UnsupportedOperationException(); + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doDisconnect() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doClose() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doBeginRead() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public ChannelConfig config() { + return config; + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public boolean isActive() { + return true; + } + + @Override + public ChannelMetadata metadata() { + throw new UnsupportedOperationException(); + } + + final class TestUnsafe extends AbstractUnsafe { + @Override + public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + throw new UnsupportedOperationException(); + } + } + } +}