Move utility method to abstract base class and correctly handle expand of buffer also for OIO

This commit is contained in:
Norman Maurer 2012-12-31 22:09:27 +01:00
parent 89a16fe01e
commit 5161ca733c
4 changed files with 53 additions and 79 deletions

View File

@ -338,6 +338,39 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return pipeline.sendFile(region, promise);
}
// 0 - not expanded because the buffer is writable
// 1 - expanded because the buffer was not writable
// 2 - could not expand because the buffer was at its maximum although the buffer is not writable.
protected static int expandReadBuffer(ByteBuf byteBuf) {
final int writerIndex = byteBuf.writerIndex();
final int capacity = byteBuf.capacity();
if (capacity != writerIndex) {
return 0;
}
final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) {
if (byteBuf.readerIndex() != 0) {
byteBuf.discardReadBytes();
return 0;
}
return 2;
}
// FIXME: Magic number
final int increment = 4096;
if (writerIndex + increment > maxCapacity) {
// Expand to maximum capacity.
byteBuf.capacity(maxCapacity);
} else {
// Expand by the increment.
byteBuf.ensureWritableBytes(increment);
}
return 1;
}
/**
* Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
*/

View File

@ -202,30 +202,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
return null;
}
private static void expandReadBuffer(ByteBuf byteBuf) {
final int writerIndex = byteBuf.writerIndex();
final int capacity = byteBuf.capacity();
if (capacity != writerIndex) {
return;
}
final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) {
return;
}
// FIXME: Magic number
final int increment = 4096;
if (writerIndex + increment > maxCapacity) {
// Expand to maximum capacity.
byteBuf.capacity(maxCapacity);
} else {
// Expand by the increment.
byteBuf.ensureWritableBytes(increment);
}
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress);

View File

@ -238,36 +238,4 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
*/
protected abstract int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception;
// 0 - not expanded because the buffer is writable
// 1 - expanded because the buffer was not writable
// 2 - could not expand because the buffer was at its maximum although the buffer is not writable.
private static int expandReadBuffer(ByteBuf byteBuf) {
final int writerIndex = byteBuf.writerIndex();
final int capacity = byteBuf.capacity();
if (capacity != writerIndex) {
return 0;
}
final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) {
if (byteBuf.readerIndex() != 0) {
byteBuf.discardReadBytes();
return 0;
}
return 2;
}
// FIXME: Magic number
final int increment = 4096;
if (writerIndex + increment > maxCapacity) {
// Expand to maximum capacity.
byteBuf.capacity(maxCapacity);
} else {
// Expand by the increment.
byteBuf.ensureWritableBytes(increment);
}
return 1;
}
}

View File

@ -58,12 +58,14 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel {
boolean read = false;
boolean firedInboundBufferSuspeneded = false;
try {
for (;;) {
expandReadBuffer(byteBuf);
loop: for (;;) {
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
break;
}
final int available = available();
@ -71,29 +73,24 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel {
break;
}
if (byteBuf.writable()) {
continue;
}
final int capacity = byteBuf.capacity();
final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
if (!byteBuf.writable()) {
throw new IllegalStateException(
"an inbound handler whose buffer is full must consume at " +
"least one byte.");
switch (expandReadBuffer(byteBuf)) {
case 0:
// Read all - stop reading.
break loop;
case 1:
// Keep reading until everything is read.
break;
case 2:
// Let the inbound handler drain the buffer and continue reading.
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
if (!byteBuf.writable()) {
throw new IllegalStateException(
"an inbound handler whose buffer is full must consume at " +
"least one byte.");
}
}
}
} else {
final int writerIndex = byteBuf.writerIndex();
if (writerIndex + available > maxCapacity) {
byteBuf.capacity(maxCapacity);
} else {
byteBuf.ensureWritableBytes(available);
}
}
}
} catch (Throwable t) {