[#530] Allow using a bounded ByteBuf as the first inbound buffer

- Clean up
- Do not stop reading when reached at maxCapacity.
  - Just let handler drain the buffer and try again quickly.
- No more magic number in OIO buffer expansion
This commit is contained in:
Trustin Lee 2012-08-19 14:48:56 +09:00
parent 9e75a33d3d
commit 44daa99d3f
3 changed files with 84 additions and 69 deletions

View File

@ -141,31 +141,28 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
};
}
private static boolean expandReadBuffer(ByteBuf byteBuf) {
final int maxCapacity = byteBuf.maxCapacity();
private static void expandReadBuffer(ByteBuf byteBuf) {
final int writerIndex = byteBuf.writerIndex();
final int capacity = byteBuf.capacity();
if (capacity != writerIndex) {
return;
}
final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) {
return false;
return;
}
// FIXME: Magic number
final int increment = 4096;
final int writerIndex = byteBuf.writerIndex();
if (writerIndex != capacity) {
// No need to expand because there's a room in the buffer.
return false;
}
// Expand to maximum capacity.
if (writerIndex + increment > maxCapacity) {
// Expand to maximum capacity.
byteBuf.capacity(maxCapacity);
return true;
} else {
// Expand by the increment.
byteBuf.ensureWritableBytes(increment);
}
// Expand by the increment.
byteBuf.ensureWritableBytes(increment);
return true;
}
@Override

View File

@ -44,7 +44,7 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel {
boolean read = false;
try {
expandReadBuffer(byteBuf);
for (;;) {
loop: for (;;) {
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
@ -52,8 +52,25 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel {
closed = true;
break;
}
if (!expandReadBuffer(byteBuf)) {
switch (expandReadBuffer(byteBuf)) {
case 0:
// Read all - stop reading.
break loop;
case 1:
// Keep reading until everything is read.
break;
case 2:
// Let the inbound handler drain the buffer and continue reading.
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
if (!byteBuf.writable()) {
throw new IllegalStateException(
"an inbound handler whose buffer is full must consume at " +
"least one byte.");
}
}
}
}
} catch (Throwable t) {
@ -100,30 +117,32 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected abstract int doReadBytes(ByteBuf buf) throws Exception;
protected abstract int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception;
private static boolean expandReadBuffer(ByteBuf byteBuf) {
final int maxCapacity = byteBuf.maxCapacity();
// 0 - not expanded because the buffer is writable
// 1 - expanded because the buffer was not writable
// 2 - could not expand because the buffer was at its maximum although the buffer is not writable.
private static int expandReadBuffer(ByteBuf byteBuf) {
final int writerIndex = byteBuf.writerIndex();
final int capacity = byteBuf.capacity();
if (capacity != writerIndex) {
return 0;
}
final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) {
return false;
return 2;
}
// FIXME: Magic number
final int increment = 4096;
final int writerIndex = byteBuf.writerIndex();
if (writerIndex != capacity) {
// No need to expand because there's a room in the buffer.
return false;
}
// Expand to maximum capacity.
if (writerIndex + increment > maxCapacity) {
// Expand to maximum capacity.
byteBuf.capacity(maxCapacity);
return true;
} else {
// Expand by the increment.
byteBuf.ensureWritableBytes(increment);
}
// Expand by the increment.
byteBuf.ensureWritableBytes(increment);
return true;
return 1;
}
}

View File

@ -40,12 +40,43 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel {
boolean closed = false;
boolean read = false;
try {
expandReadBuffer(byteBuf);
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
for (;;) {
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
final int available = available();
if (available <= 0) {
break;
}
if (byteBuf.writable()) {
continue;
}
final int capacity = byteBuf.capacity();
final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
if (!byteBuf.writable()) {
throw new IllegalStateException(
"an inbound handler whose buffer is full must consume at " +
"least one byte.");
}
}
} else {
final int writerIndex = byteBuf.writerIndex();
if (writerIndex + available > maxCapacity) {
byteBuf.capacity(maxCapacity);
} else {
byteBuf.ensureWritableBytes(available);
}
}
}
} catch (Throwable t) {
if (read) {
@ -65,38 +96,6 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel {
}
}
}
private boolean expandReadBuffer(ByteBuf byteBuf) {
final int maxCapacity = byteBuf.maxCapacity();
final int capacity = byteBuf.capacity();
if (capacity == maxCapacity) {
return false;
}
final int available = available();
final int writerIndex = byteBuf.writerIndex();
if (available > 0) {
if (writerIndex + available > maxCapacity) {
byteBuf.capacity(maxCapacity);
} else {
byteBuf.ensureWritableBytes(available);
}
return true;
}
if (writerIndex != capacity) {
return false;
}
// FIXME: magic number
final int increment = 4096;
if (writerIndex + increment > maxCapacity) {
byteBuf.capacity(maxCapacity);
} else {
byteBuf.ensureWritableBytes(increment);
}
return true;
}
}
@Override