[#530] Allow using a bounded ByteBuf as the first inbound buffer

This commit is contained in:
Trustin Lee 2012-08-19 13:55:12 +09:00
parent 11c742f392
commit 9e75a33d3d
4 changed files with 97 additions and 20 deletions

View File

@ -45,8 +45,21 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
EchoHandler sh = new EchoHandler();
EchoHandler ch = new EchoHandler();
testSimpleEcho0(sb, cb, Integer.MAX_VALUE);
}
@Test
public void testSimpleEchoWithBoundedBuffer() throws Throwable {
run();
}
public void testSimpleEchoWithBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, 4);
}
private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb, int maxInboundBufferSize) throws Throwable {
EchoHandler sh = new EchoHandler(maxInboundBufferSize);
EchoHandler ch = new EchoHandler(maxInboundBufferSize);
sb.childHandler(sh);
cb.handler(ch);
@ -109,11 +122,18 @@ public class SocketEchoTest extends AbstractSocketTest {
}
private static class EchoHandler extends ChannelInboundByteHandlerAdapter {
private final int maxInboundBufferSize;
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter;
EchoHandler() {
EchoHandler(int maxInboundBufferSize) {
this.maxInboundBufferSize = maxInboundBufferSize;
}
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.buffer(0, maxInboundBufferSize);
}
@Override

View File

@ -142,12 +142,30 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
private static boolean expandReadBuffer(ByteBuf byteBuf) {
if (!byteBuf.writable()) {
// FIXME: Magic number
byteBuf.ensureWritableBytes(4096);
final int maxCapacity = byteBuf.maxCapacity();
final int capacity = byteBuf.capacity();
if (capacity == maxCapacity) {
return false;
}
// FIXME: Magic number
final int increment = 4096;
final int writerIndex = byteBuf.writerIndex();
if (writerIndex != capacity) {
// No need to expand because there's a room in the buffer.
return false;
}
// Expand to maximum capacity.
if (writerIndex + increment > maxCapacity) {
byteBuf.capacity(maxCapacity);
return true;
}
return false;
// Expand by the increment.
byteBuf.ensureWritableBytes(increment);
return true;
}
@Override
@ -211,10 +229,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
ByteBuf byteBuf = pipeline().inboundByteBuffer();
if (!byteBuf.readable()) {
byteBuf.discardReadBytes();
} else {
expandReadBuffer(byteBuf);
}
expandReadBuffer(byteBuf);
if (byteBuf.hasNioBuffers()) {
ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes());
javaChannel().read(buffers, 0, buffers.length, config.getWriteTimeout(),

View File

@ -101,12 +101,29 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected abstract int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception;
private static boolean expandReadBuffer(ByteBuf byteBuf) {
if (!byteBuf.writable()) {
// FIXME: Magic number
byteBuf.ensureWritableBytes(4096);
final int maxCapacity = byteBuf.maxCapacity();
final int capacity = byteBuf.capacity();
if (capacity == maxCapacity) {
return false;
}
// FIXME: Magic number
final int increment = 4096;
final int writerIndex = byteBuf.writerIndex();
if (writerIndex != capacity) {
// No need to expand because there's a room in the buffer.
return false;
}
// Expand to maximum capacity.
if (writerIndex + increment > maxCapacity) {
byteBuf.capacity(maxCapacity);
return true;
}
return false;
// Expand by the increment.
byteBuf.ensureWritableBytes(increment);
return true;
}
}

View File

@ -66,14 +66,36 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel {
}
}
private void expandReadBuffer(ByteBuf byteBuf) {
int available = available();
if (available > 0) {
byteBuf.ensureWritableBytes(available);
} else if (!byteBuf.writable()) {
// FIXME: Magic number
byteBuf.ensureWritableBytes(4096);
private boolean expandReadBuffer(ByteBuf byteBuf) {
final int maxCapacity = byteBuf.maxCapacity();
final int capacity = byteBuf.capacity();
if (capacity == maxCapacity) {
return false;
}
final int available = available();
final int writerIndex = byteBuf.writerIndex();
if (available > 0) {
if (writerIndex + available > maxCapacity) {
byteBuf.capacity(maxCapacity);
} else {
byteBuf.ensureWritableBytes(available);
}
return true;
}
if (writerIndex != capacity) {
return false;
}
// FIXME: magic number
final int increment = 4096;
if (writerIndex + increment > maxCapacity) {
byteBuf.capacity(maxCapacity);
} else {
byteBuf.ensureWritableBytes(increment);
}
return true;
}
}