From 9e75a33d3d21a300481dbddb33dce6243bdbb8a7 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sun, 19 Aug 2012 13:55:12 +0900 Subject: [PATCH] [#530] Allow using a bounded ByteBuf as the first inbound buffer --- .../transport/socket/SocketEchoTest.java | 26 ++++++++++++-- .../channel/socket/aio/AioSocketChannel.java | 30 ++++++++++++---- .../socket/nio/AbstractNioByteChannel.java | 25 ++++++++++--- .../socket/oio/AbstractOioByteChannel.java | 36 +++++++++++++++---- 4 files changed, 97 insertions(+), 20 deletions(-) diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java index 2f7ec7840f..1b9c96d1ed 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java @@ -45,8 +45,21 @@ public class SocketEchoTest extends AbstractSocketTest { } public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { - EchoHandler sh = new EchoHandler(); - EchoHandler ch = new EchoHandler(); + testSimpleEcho0(sb, cb, Integer.MAX_VALUE); + } + + @Test + public void testSimpleEchoWithBoundedBuffer() throws Throwable { + run(); + } + + public void testSimpleEchoWithBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable { + testSimpleEcho0(sb, cb, 4); + } + + private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb, int maxInboundBufferSize) throws Throwable { + EchoHandler sh = new EchoHandler(maxInboundBufferSize); + EchoHandler ch = new EchoHandler(maxInboundBufferSize); sb.childHandler(sh); cb.handler(ch); @@ -109,11 +122,18 @@ public class SocketEchoTest extends AbstractSocketTest { } private static class EchoHandler extends ChannelInboundByteHandlerAdapter { + private final int maxInboundBufferSize; volatile Channel channel; final AtomicReference exception = new AtomicReference(); volatile int counter; - EchoHandler() { + EchoHandler(int maxInboundBufferSize) { + this.maxInboundBufferSize = maxInboundBufferSize; + } + + @Override + public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { + return Unpooled.buffer(0, maxInboundBufferSize); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index c82269b82a..ef4ebbe207 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -142,12 +142,30 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } private static boolean expandReadBuffer(ByteBuf byteBuf) { - if (!byteBuf.writable()) { - // FIXME: Magic number - byteBuf.ensureWritableBytes(4096); + final int maxCapacity = byteBuf.maxCapacity(); + final int capacity = byteBuf.capacity(); + if (capacity == maxCapacity) { + return false; + } + + // FIXME: Magic number + final int increment = 4096; + + final int writerIndex = byteBuf.writerIndex(); + if (writerIndex != capacity) { + // No need to expand because there's a room in the buffer. + return false; + } + + // Expand to maximum capacity. + if (writerIndex + increment > maxCapacity) { + byteBuf.capacity(maxCapacity); return true; } - return false; + + // Expand by the increment. + byteBuf.ensureWritableBytes(increment); + return true; } @Override @@ -211,10 +229,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne ByteBuf byteBuf = pipeline().inboundByteBuffer(); if (!byteBuf.readable()) { byteBuf.discardReadBytes(); - } else { - expandReadBuffer(byteBuf); } + expandReadBuffer(byteBuf); + if (byteBuf.hasNioBuffers()) { ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes()); javaChannel().read(buffers, 0, buffers.length, config.getWriteTimeout(), diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java index c3b8c4364a..5b5ed33aae 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java @@ -101,12 +101,29 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel { protected abstract int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception; private static boolean expandReadBuffer(ByteBuf byteBuf) { - if (!byteBuf.writable()) { - // FIXME: Magic number - byteBuf.ensureWritableBytes(4096); + final int maxCapacity = byteBuf.maxCapacity(); + final int capacity = byteBuf.capacity(); + if (capacity == maxCapacity) { + return false; + } + + // FIXME: Magic number + final int increment = 4096; + + final int writerIndex = byteBuf.writerIndex(); + if (writerIndex != capacity) { + // No need to expand because there's a room in the buffer. + return false; + } + + // Expand to maximum capacity. + if (writerIndex + increment > maxCapacity) { + byteBuf.capacity(maxCapacity); return true; } - return false; + // Expand by the increment. + byteBuf.ensureWritableBytes(increment); + return true; } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java index 69d9c75927..ba9ac0630a 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java @@ -66,14 +66,36 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel { } } - private void expandReadBuffer(ByteBuf byteBuf) { - int available = available(); - if (available > 0) { - byteBuf.ensureWritableBytes(available); - } else if (!byteBuf.writable()) { - // FIXME: Magic number - byteBuf.ensureWritableBytes(4096); + private boolean expandReadBuffer(ByteBuf byteBuf) { + final int maxCapacity = byteBuf.maxCapacity(); + final int capacity = byteBuf.capacity(); + if (capacity == maxCapacity) { + return false; } + + final int available = available(); + final int writerIndex = byteBuf.writerIndex(); + if (available > 0) { + if (writerIndex + available > maxCapacity) { + byteBuf.capacity(maxCapacity); + } else { + byteBuf.ensureWritableBytes(available); + } + return true; + } + + if (writerIndex != capacity) { + return false; + } + + // FIXME: magic number + final int increment = 4096; + if (writerIndex + increment > maxCapacity) { + byteBuf.capacity(maxCapacity); + } else { + byteBuf.ensureWritableBytes(increment); + } + return true; } }