[#1890] Correctly expand ByteBuffer array in all cases

The problem was that we did not handle the case correctly when doubling the array was not enough. We need to keep doubling until everything fits in.
This commit is contained in:
Norman Maurer 2013-10-06 15:29:35 +02:00
parent 2366c2846d
commit 7ad3c1e10c
2 changed files with 227 additions and 7 deletions

View File

@ -380,8 +380,9 @@ public final class ChannelOutboundBuffer {
if (count == -1) {
entry.count = count = buf.nioBufferCount();
}
if (nioBufferCount + count > nioBuffers.length) {
this.nioBuffers = nioBuffers = doubleNioBufferArray(nioBuffers, nioBufferCount);
int neededSpace = nioBufferCount + count;
if (neededSpace > nioBuffers.length) {
this.nioBuffers = nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
}
if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
@ -442,11 +443,18 @@ public final class ChannelOutboundBuffer {
return nioBufferCount;
}
private static ByteBuffer[] doubleNioBufferArray(ByteBuffer[] array, int size) {
int newCapacity = array.length << 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// double capacity until it is big enough
// See https://github.com/netty/netty/issues/1890
newCapacity <<= 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
} while (neededSpace > newCapacity);
ByteBuffer[] newArray = new ByteBuffer[newCapacity];
System.arraycopy(array, 0, newArray, 0, size);

View File

@ -0,0 +1,212 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.util.CharsetUtil;
import org.junit.Test;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import static io.netty.buffer.Unpooled.*;
import static org.junit.Assert.*;
public class ChannelOutboundBufferTest {
@Test
public void testEmptyNioBuffers() {
TestChannel channel = new TestChannel();
ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel);
assertEquals(0, buffer.nioBufferCount());
ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals(32, buffers.length);
for (ByteBuffer b: buffers) {
assertNull(b);
}
assertEquals(0, buffer.nioBufferCount());
}
@Test
public void testNioBuffersSingleBacked() {
TestChannel channel = new TestChannel();
ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel);
assertEquals(0, buffer.nioBufferCount());
ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals(32, buffers.length);
for (ByteBuffer b: buffers) {
assertNull(b);
}
assertEquals(0, buffer.nioBufferCount());
ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII);
buffer.addMessage(buf, channel.voidPromise());
buffers = buffer.nioBuffers();
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
for (ByteBuffer b: buffers) {
assertNull(b);
}
buffer.addFlush();
buffers = buffer.nioBuffers();
assertEquals(32, buffers.length);
assertEquals("Should still be 0 as not flushed yet", 1, buffer.nioBufferCount());
for (int i = 0; i < buffers.length; i++) {
if (i == 0) {
assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes()));
} else {
assertNull(buffers[i]);
}
}
}
@Test
public void testNioBuffersExpand() {
TestChannel channel = new TestChannel();
ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel);
ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII);
for (int i = 0; i < 64; i++) {
buffer.addMessage(buf.copy(), channel.voidPromise());
}
ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
for (ByteBuffer b: buffers) {
assertNull(b);
}
buffer.addFlush();
buffers = buffer.nioBuffers();
assertEquals(64, buffers.length);
assertEquals(64, buffer.nioBufferCount());
for (int i = 0; i < buffers.length; i++) {
assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes()));
}
}
@Test
public void testNioBuffersExpand2() {
TestChannel channel = new TestChannel();
ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel);
CompositeByteBuf comp = compositeBuffer(256);
ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII);
for (int i = 0; i < 65; i++) {
comp.addComponent(buf.copy()).writerIndex(comp.writerIndex() + buf.readableBytes());
}
System.out.println(comp.nioBufferCount());
buffer.addMessage(comp, channel.voidPromise());
ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
for (ByteBuffer b: buffers) {
assertNull(b);
}
buffer.addFlush();
buffers = buffer.nioBuffers();
assertEquals(128, buffers.length);
assertEquals(65, buffer.nioBufferCount());
for (int i = 0; i < buffers.length; i++) {
if (i < 65) {
assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes()));
} else {
assertNull(buffers[i]);
}
}
}
private static final class TestChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig(this);
TestChannel() {
super(null);
}
@Override
protected AbstractUnsafe newUnsafe() {
return new TestUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return false;
}
@Override
protected SocketAddress localAddress0() {
throw new UnsupportedOperationException();
}
@Override
protected SocketAddress remoteAddress0() {
throw new UnsupportedOperationException();
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doClose() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doBeginRead() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public ChannelConfig config() {
return config;
}
@Override
public boolean isOpen() {
return true;
}
@Override
public boolean isActive() {
return true;
}
@Override
public ChannelMetadata metadata() {
throw new UnsupportedOperationException();
}
final class TestUnsafe extends AbstractUnsafe {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
throw new UnsupportedOperationException();
}
}
}
}