diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index ef4ebbe207..ea0d809029 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -141,31 +141,28 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne }; } - private static boolean expandReadBuffer(ByteBuf byteBuf) { - final int maxCapacity = byteBuf.maxCapacity(); + private static void expandReadBuffer(ByteBuf byteBuf) { + final int writerIndex = byteBuf.writerIndex(); final int capacity = byteBuf.capacity(); + if (capacity != writerIndex) { + return; + } + + final int maxCapacity = byteBuf.maxCapacity(); if (capacity == maxCapacity) { - return false; + return; } // FIXME: Magic number final int increment = 4096; - final int writerIndex = byteBuf.writerIndex(); - if (writerIndex != capacity) { - // No need to expand because there's a room in the buffer. - return false; - } - - // Expand to maximum capacity. if (writerIndex + increment > maxCapacity) { + // Expand to maximum capacity. byteBuf.capacity(maxCapacity); - return true; + } else { + // Expand by the increment. + byteBuf.ensureWritableBytes(increment); } - - // Expand by the increment. - byteBuf.ensureWritableBytes(increment); - return true; } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java index 5b5ed33aae..0c34a19995 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java @@ -44,7 +44,7 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel { boolean read = false; try { expandReadBuffer(byteBuf); - for (;;) { + loop: for (;;) { int localReadAmount = doReadBytes(byteBuf); if (localReadAmount > 0) { read = true; @@ -52,8 +52,25 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel { closed = true; break; } - if (!expandReadBuffer(byteBuf)) { + + switch (expandReadBuffer(byteBuf)) { + case 0: + // Read all - stop reading. + break loop; + case 1: + // Keep reading until everything is read. break; + case 2: + // Let the inbound handler drain the buffer and continue reading. + if (read) { + read = false; + pipeline.fireInboundBufferUpdated(); + if (!byteBuf.writable()) { + throw new IllegalStateException( + "an inbound handler whose buffer is full must consume at " + + "least one byte."); + } + } } } } catch (Throwable t) { @@ -100,30 +117,32 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel { protected abstract int doReadBytes(ByteBuf buf) throws Exception; protected abstract int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception; - private static boolean expandReadBuffer(ByteBuf byteBuf) { - final int maxCapacity = byteBuf.maxCapacity(); + // 0 - not expanded because the buffer is writable + // 1 - expanded because the buffer was not writable + // 2 - could not expand because the buffer was at its maximum although the buffer is not writable. + private static int expandReadBuffer(ByteBuf byteBuf) { + final int writerIndex = byteBuf.writerIndex(); final int capacity = byteBuf.capacity(); + if (capacity != writerIndex) { + return 0; + } + + final int maxCapacity = byteBuf.maxCapacity(); if (capacity == maxCapacity) { - return false; + return 2; } // FIXME: Magic number final int increment = 4096; - final int writerIndex = byteBuf.writerIndex(); - if (writerIndex != capacity) { - // No need to expand because there's a room in the buffer. - return false; - } - - // Expand to maximum capacity. if (writerIndex + increment > maxCapacity) { + // Expand to maximum capacity. byteBuf.capacity(maxCapacity); - return true; + } else { + // Expand by the increment. + byteBuf.ensureWritableBytes(increment); } - // Expand by the increment. - byteBuf.ensureWritableBytes(increment); - return true; + return 1; } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java index ba9ac0630a..f818d6e6a5 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java @@ -40,12 +40,43 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel { boolean closed = false; boolean read = false; try { - expandReadBuffer(byteBuf); - int localReadAmount = doReadBytes(byteBuf); - if (localReadAmount > 0) { - read = true; - } else if (localReadAmount < 0) { - closed = true; + for (;;) { + int localReadAmount = doReadBytes(byteBuf); + if (localReadAmount > 0) { + read = true; + } else if (localReadAmount < 0) { + closed = true; + } + + final int available = available(); + if (available <= 0) { + break; + } + + if (byteBuf.writable()) { + continue; + } + + final int capacity = byteBuf.capacity(); + final int maxCapacity = byteBuf.maxCapacity(); + if (capacity == maxCapacity) { + if (read) { + read = false; + pipeline.fireInboundBufferUpdated(); + if (!byteBuf.writable()) { + throw new IllegalStateException( + "an inbound handler whose buffer is full must consume at " + + "least one byte."); + } + } + } else { + final int writerIndex = byteBuf.writerIndex(); + if (writerIndex + available > maxCapacity) { + byteBuf.capacity(maxCapacity); + } else { + byteBuf.ensureWritableBytes(available); + } + } } } catch (Throwable t) { if (read) { @@ -65,38 +96,6 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel { } } } - - private boolean expandReadBuffer(ByteBuf byteBuf) { - final int maxCapacity = byteBuf.maxCapacity(); - final int capacity = byteBuf.capacity(); - if (capacity == maxCapacity) { - return false; - } - - final int available = available(); - final int writerIndex = byteBuf.writerIndex(); - if (available > 0) { - if (writerIndex + available > maxCapacity) { - byteBuf.capacity(maxCapacity); - } else { - byteBuf.ensureWritableBytes(available); - } - return true; - } - - if (writerIndex != capacity) { - return false; - } - - // FIXME: magic number - final int increment = 4096; - if (writerIndex + increment > maxCapacity) { - byteBuf.capacity(maxCapacity); - } else { - byteBuf.ensureWritableBytes(increment); - } - return true; - } } @Override