Add ByteBuf.Unsafe.discardSomeReadBytes() to reduce discardReadBytes()

This commit is contained in:
Trustin Lee 2012-08-08 17:34:00 +09:00
parent b8a60dddd3
commit a2aadef4da
13 changed files with 63 additions and 17 deletions

View File

@ -1808,6 +1808,14 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
*/
ByteBuf newBuffer(int initialCapacity);
/**
* Similar to {@link ByteBuf#discardReadBytes()} except that this method might discard
* some, all, or none of read bytes depending on its internal implementation to reduce
* overall memory bandwidth consumption at the cost of potentially additional memory
* consumption.
*/
void discardSomeReadBytes();
/**
* Increases the reference count of the buffer.
*/

View File

@ -1201,6 +1201,11 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
return buf;
}
@Override
public void discardSomeReadBytes() {
discardReadComponents();
}
@Override
public void acquire() {
if (refCnt <= 0) {

View File

@ -413,6 +413,19 @@ public class DirectByteBuf extends AbstractByteBuf {
return new DirectByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity()));
}
@Override
public void discardSomeReadBytes() {
final int readerIndex = readerIndex();
if (readerIndex == writerIndex()) {
discardReadBytes();
return;
}
if (readerIndex > 0 && readerIndex >= capacity >>> 1) {
discardReadBytes();
}
}
@Override
public void acquire() {
if (refCnt <= 0) {

View File

@ -228,6 +228,11 @@ public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf
return buffer.unsafe().newBuffer(initialCapacity);
}
@Override
public void discardSomeReadBytes() {
throw new UnsupportedOperationException();
}
@Override
public void acquire() {
buffer.unsafe().acquire();

View File

@ -302,6 +302,19 @@ public class HeapByteBuf extends AbstractByteBuf {
return new HeapByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity()));
}
@Override
public void discardSomeReadBytes() {
final int readerIndex = readerIndex();
if (readerIndex == writerIndex()) {
discardReadBytes();
return;
}
if (readerIndex > 0 && readerIndex >= capacity() >>> 1) {
discardReadBytes();
}
}
@Override
public void acquire() {
if (refCnt <= 0) {

View File

@ -295,6 +295,11 @@ public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf {
return buffer.unsafe().newBuffer(initialCapacity);
}
@Override
public void discardSomeReadBytes() {
throw new UnsupportedOperationException();
}
@Override
public void acquire() {
buffer.unsafe().acquire();

View File

@ -46,7 +46,6 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter
}
if (out.readableBytes() > oldOutSize) {
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
@ -71,8 +70,8 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter
}
}
in.unsafe().discardSomeReadBytes();
if (out.readableBytes() > oldOutSize) {
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
}

View File

@ -44,10 +44,7 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte
}
}
if (out.readableBytes() > oldOutSize) {
in.discardReadBytes();
}
in.unsafe().discardSomeReadBytes();
ctx.flush(future);
}

View File

@ -52,7 +52,6 @@ public abstract class ByteToMessageDecoder<O>
try {
if (CodecUtil.unfoldAndAdd(ctx, decodeLast(ctx, in), true)) {
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
} catch (Throwable t) {
@ -93,9 +92,10 @@ public abstract class ByteToMessageDecoder<O>
break;
}
} catch (Throwable t) {
in.unsafe().discardSomeReadBytes();
if (decoded) {
decoded = false;
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
@ -107,8 +107,9 @@ public abstract class ByteToMessageDecoder<O>
}
}
in.unsafe().discardSomeReadBytes();
if (decoded) {
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
}

View File

@ -455,8 +455,10 @@ public abstract class ReplayingDecoder<O, S> extends ByteToMessageDecoder<O> {
}
private void fireInboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
checkpoint -= in.readerIndex();
in.discardReadBytes();
final int oldReaderIndex = in.readerIndex();
in.unsafe().discardSomeReadBytes();
final int newReaderIndex = in.readerIndex();
checkpoint -= oldReaderIndex - newReaderIndex;
ctx.fireInboundBufferUpdated();
}
}

View File

@ -100,7 +100,7 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder<Object> {
oos.reset();
// Also discard the byproduct to avoid OOM on the sending side.
out.discardReadBytes();
out.unsafe().discardSomeReadBytes();
}
}

View File

@ -336,7 +336,7 @@ public class SslHandler
final ByteBuf in = ctx.outboundByteBuffer();
final ByteBuf out = ctx.nextOutboundByteBuffer();
out.discardReadBytes();
out.unsafe().discardSomeReadBytes();
// Do not encrypt the first write request if this handler is
// created with startTLS flag turned on.
@ -398,9 +398,7 @@ public class SslHandler
setHandshakeFailure(e);
throw e;
} finally {
if (bytesProduced > 0) {
in.discardReadBytes();
}
in.unsafe().discardSomeReadBytes();
ctx.flush(future);
}
}

View File

@ -441,7 +441,7 @@ public class LocalTransportThreadModelTest {
out.add(msg);
}
}
in.discardReadBytes();
in.unsafe().discardSomeReadBytes();
if (swallow) {
future.setSuccess();
} else {