From 7ad3c1e10cc01253437dc1fce8324bb685a13892 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sun, 6 Oct 2013 15:29:35 +0200 Subject: [PATCH] [#1890] Correctly expand ByteBuffer array in all cases The problem was that we did not handle the case correctly when doubling the array was not enough. We need to keep doubling until everything fits in. --- .../netty/channel/ChannelOutboundBuffer.java | 22 +- .../channel/ChannelOutboundBufferTest.java | 212 ++++++++++++++++++ 2 files changed, 227 insertions(+), 7 deletions(-) create mode 100644 transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 393aad8246..0dbdccaaa4 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -380,8 +380,9 @@ public final class ChannelOutboundBuffer { if (count == -1) { entry.count = count = buf.nioBufferCount(); } - if (nioBufferCount + count > nioBuffers.length) { - this.nioBuffers = nioBuffers = doubleNioBufferArray(nioBuffers, nioBufferCount); + int neededSpace = nioBufferCount + count; + if (neededSpace > nioBuffers.length) { + this.nioBuffers = nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); } if (buf.isDirect() || threadLocalDirectBufferSize <= 0) { @@ -442,11 +443,18 @@ public final class ChannelOutboundBuffer { return nioBufferCount; } - private static ByteBuffer[] doubleNioBufferArray(ByteBuffer[] array, int size) { - int newCapacity = array.length << 1; - if (newCapacity < 0) { - throw new IllegalStateException(); - } + private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { + int newCapacity = array.length; + do { + // double capacity until it is big enough + // See https://github.com/netty/netty/issues/1890 + newCapacity <<= 1; + + if (newCapacity < 0) { + throw new IllegalStateException(); + } + + } while (neededSpace > newCapacity); ByteBuffer[] newArray = new ByteBuffer[newCapacity]; System.arraycopy(array, 0, newArray, 0, size); diff --git a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java new file mode 100644 index 0000000000..ecd0a0a840 --- /dev/null +++ b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java @@ -0,0 +1,212 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.util.CharsetUtil; +import org.junit.Test; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import static io.netty.buffer.Unpooled.*; +import static org.junit.Assert.*; + +public class ChannelOutboundBufferTest { + + @Test + public void testEmptyNioBuffers() { + TestChannel channel = new TestChannel(); + ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); + assertEquals(0, buffer.nioBufferCount()); + ByteBuffer[] buffers = buffer.nioBuffers(); + assertEquals(32, buffers.length); + for (ByteBuffer b: buffers) { + assertNull(b); + } + assertEquals(0, buffer.nioBufferCount()); + } + + @Test + public void testNioBuffersSingleBacked() { + TestChannel channel = new TestChannel(); + + ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); + assertEquals(0, buffer.nioBufferCount()); + ByteBuffer[] buffers = buffer.nioBuffers(); + assertEquals(32, buffers.length); + for (ByteBuffer b: buffers) { + assertNull(b); + } + assertEquals(0, buffer.nioBufferCount()); + + ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII); + buffer.addMessage(buf, channel.voidPromise()); + buffers = buffer.nioBuffers(); + assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); + for (ByteBuffer b: buffers) { + assertNull(b); + } + buffer.addFlush(); + buffers = buffer.nioBuffers(); + assertEquals(32, buffers.length); + assertEquals("Should still be 0 as not flushed yet", 1, buffer.nioBufferCount()); + for (int i = 0; i < buffers.length; i++) { + if (i == 0) { + assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes())); + } else { + assertNull(buffers[i]); + } + } + } + + @Test + public void testNioBuffersExpand() { + TestChannel channel = new TestChannel(); + + ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); + + ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII); + for (int i = 0; i < 64; i++) { + buffer.addMessage(buf.copy(), channel.voidPromise()); + } + ByteBuffer[] buffers = buffer.nioBuffers(); + assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); + for (ByteBuffer b: buffers) { + assertNull(b); + } + buffer.addFlush(); + buffers = buffer.nioBuffers(); + assertEquals(64, buffers.length); + assertEquals(64, buffer.nioBufferCount()); + for (int i = 0; i < buffers.length; i++) { + assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes())); + } + } + + @Test + public void testNioBuffersExpand2() { + TestChannel channel = new TestChannel(); + + ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); + + CompositeByteBuf comp = compositeBuffer(256); + ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII); + for (int i = 0; i < 65; i++) { + comp.addComponent(buf.copy()).writerIndex(comp.writerIndex() + buf.readableBytes()); + } + System.out.println(comp.nioBufferCount()); + buffer.addMessage(comp, channel.voidPromise()); + + ByteBuffer[] buffers = buffer.nioBuffers(); + assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); + for (ByteBuffer b: buffers) { + assertNull(b); + } + buffer.addFlush(); + buffers = buffer.nioBuffers(); + assertEquals(128, buffers.length); + assertEquals(65, buffer.nioBufferCount()); + for (int i = 0; i < buffers.length; i++) { + if (i < 65) { + assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes())); + } else { + assertNull(buffers[i]); + } + } + } + + private static final class TestChannel extends AbstractChannel { + private final ChannelConfig config = new DefaultChannelConfig(this); + + TestChannel() { + super(null); + } + + @Override + protected AbstractUnsafe newUnsafe() { + return new TestUnsafe(); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return false; + } + + @Override + protected SocketAddress localAddress0() { + throw new UnsupportedOperationException(); + } + + @Override + protected SocketAddress remoteAddress0() { + throw new UnsupportedOperationException(); + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doDisconnect() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doClose() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doBeginRead() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public ChannelConfig config() { + return config; + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public boolean isActive() { + return true; + } + + @Override + public ChannelMetadata metadata() { + throw new UnsupportedOperationException(); + } + + final class TestUnsafe extends AbstractUnsafe { + @Override + public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + throw new UnsupportedOperationException(); + } + } + } +}