Introduce a FrameDecoder.setMaxCumulationBufferCapacity(..) setter which allows to configure how bug the capacity of the cumulation buffer can be before the FrameDecoder tries to optimize memory usage with byte copies. Related to #390

This allows the users to set a threshold that matches best their needs. Use Integer.MAX_VALUE to disable copies at all at the cost of bigger memory usage.
This commit is contained in:
norman 2012-06-13 11:22:37 +02:00
parent cb8fc7af4a
commit df11cfab25
2 changed files with 158 additions and 81 deletions

View File

@ -20,6 +20,7 @@ import java.net.SocketAddress;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.CompositeChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
@ -181,6 +182,7 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implemen
private final boolean unfold;
protected ChannelBuffer cumulation;
private volatile ChannelHandlerContext ctx;
private int copyThreshold;
protected FrameDecoder() {
this(false);
@ -190,6 +192,48 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implemen
this.unfold = unfold;
}
/**
* See {@link #setMaxCumulationBufferCapacity(int)} for explaintation of this setting
*
*/
public final int getMaxCumulationBufferCapacity() {
return copyThreshold;
}
/**
* Set the maximal capacity of the internal cumulation ChannelBuffer to use
* before the {@link FrameDecoder} tries to minimize the memory usage by
* "byte copy".
*
*
* What you use here really depends on your application and need. Using
* {@link Integer#MAX_VALUE} will disable all byte copies but give you the
* cost of a higher memory usage if big {@link ChannelBuffer}'s will be
* received.
*
* By default a threshold of <code>0</code> is used, which means it will
* always copy to try to reduce memory usage
*
*
* @param copyThreshold
* the threshold (in bytes) or {@link Integer#MAX_VALUE} to
* disable it. The value must be at least 0
* @throws IllegalStateException
* get thrown if someone tries to change this setting after the
* Decoder was added to the {@link ChannelPipeline}
*/
public final void setMaxCumulationBufferCapacity(int copyThreshold) {
if (copyThreshold < 0) {
throw new IllegalArgumentException("MaxCumulationBufferCapacity must be >= 0");
}
if (ctx == null) {
this.copyThreshold = copyThreshold;
} else {
throw new IllegalStateException("MaxCumulationBufferCapacity " +
"can only be changed before the Decoder was added to the ChannelPipeline");
}
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) throws Exception {
@ -213,43 +257,35 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implemen
// the cumulation buffer is not created yet so just pass the input to callDecode(...) method
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
} finally {
if (input.readable()) {
int readable = input.readableBytes();
if (readable > 0) {
int cap = input.capacity();
// check if readableBytes == capacity we can safe the copy as we will not be able to
// optimize memory usage anyway
if (readable != cap && cap > copyThreshold) {
// seems like there is something readable left in the input buffer. So create
// the cumulation buffer and copy the input into it
(cumulation = newCumulationBuffer(ctx, input.readableBytes())).writeBytes(input);
cumulation = newCumulationBuffer(ctx, input.readableBytes());
cumulation.writeBytes(input);
} else {
// just use the input as cumulation buffer for now
cumulation = input;
}
}
}
} else {
assert cumulation.readable();
boolean fit = false;
int readable = input.readableBytes();
int writable = cumulation.writableBytes();
int w = writable - readable;
if (w < 0) {
int readerIndex = cumulation.readerIndex();
if (w + readerIndex >= 0) {
// the input will fit if we discard all read bytes, so do it
cumulation.discardReadBytes();
fit = true;
}
} else {
// ok the input fit into the cumulation buffer
fit = true;
}
ChannelBuffer buf;
if (fit) {
// the input fit in the cumulation buffer so copy it over
buf = cumulation;
buf.writeBytes(input);
} else {
// wrap the cumulation and input
buf = ChannelBuffers.wrappedBuffer(cumulation, input);
//
// We use a CompositeBuffer all the time as its always faster the
// byte-copy if the wrapped buffer count == 2
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(cumulation, input);
cumulation = buf;
}
// Wrap in try / finally.
//
@ -257,14 +293,35 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implemen
try {
callDecode(ctx, e.getChannel(), buf, e.getRemoteAddress());
} finally {
if (!buf.readable()) {
int readable = buf.readableBytes();
if (readable == 0) {
// nothing readable left so reset the state
cumulation = null;
} else {
// create a new buffer and copy the readable buffer into it
int cap = buf.capacity();
if (readable != cap && cap > copyThreshold) {
// the readable bytes are > as the configured
// copyThreshold, so create a new buffer and copy the
// bytes into it
cumulation = newCumulationBuffer(ctx, buf.readableBytes());
cumulation.writeBytes(buf);
} else {
if (readable == cap) {
cumulation = buf;
} else {
// create a new cumulation buffer that holds the
// unwrapped parts of the CompositeChannelBuffer
// that are not read yet.
cumulation = ChannelBuffers.wrappedBuffer(((CompositeChannelBuffer) buf)
.decompose(buf.readerIndex(), buf.readableBytes())
.toArray(new ChannelBuffer[0]));
}
}
}
}

View File

@ -19,6 +19,7 @@ import java.net.SocketAddress;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.CompositeChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
@ -439,26 +440,51 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
input, replayable,
e.getRemoteAddress());
} finally {
if (input.readable()) {
int readable = input.readableBytes();
if (readable > 0) {
int cap = input.capacity();
boolean copy = false;
// check if readableBytes == capacity we can safe the copy as we will not be able to
// optimize memory usage anyway
if (readable != cap && cap > getMaxCumulationBufferCapacity()) {
copy = true;
}
// seems like there is something readable left in the input buffer
// or decoder wants a replay - create the cumulation buffer and
// copy the input into it
ChannelBuffer cumulation;
if (checkpoint > 0) {
int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex);
if (copy) {
cumulation = this.cumulation =
newCumulationBuffer(ctx, bytesToPreserve);
cumulation.writeBytes(input, checkpoint, bytesToPreserve);
} else {
cumulation = this.cumulation =
input.slice(checkpoint, bytesToPreserve);
}
} else if (checkpoint == 0) {
if (copy) {
cumulation = this.cumulation =
newCumulationBuffer(ctx, inputSize);
cumulation.writeBytes(input, oldReaderIndex, inputSize);
cumulation.readerIndex(input.readerIndex());
} else {
cumulation = this.cumulation =
input.slice(oldReaderIndex, inputSize);
cumulation.readerIndex(input.readerIndex());
}
} else {
if (copy) {
cumulation = this.cumulation =
newCumulationBuffer(ctx, input.readableBytes());
cumulation.writeBytes(input);
} else {
cumulation = this.cumulation =
input;
}
}
replayable = new ReplayingDecoderBuffer(cumulation);
} else {
@ -469,35 +495,10 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
} else {
assert cumulation.readable();
boolean fit = false;
int readable = input.readableBytes();
int writable = cumulation.writableBytes();
int w = writable - readable;
if (w < 0) {
int readerIndex = cumulation.readerIndex();
if (w + readerIndex >= 0) {
// the input will fit if we discard all read bytes, so do it
cumulation.discardReadBytes();
fit = true;
}
} else {
// ok the input fit into the cumulation buffer
fit = true;
}
ChannelBuffer buf;
if (fit) {
// the input fit in the cumulation buffer so copy it over
buf = cumulation;
buf.writeBytes(input);
} else {
// wrap the cumulation and input
buf = ChannelBuffers.wrappedBuffer(cumulation, input);
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(cumulation, input);
cumulation = buf;
replayable = new ReplayingDecoderBuffer(cumulation);
}
// Wrap in try / finally.
//
@ -505,16 +506,35 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
try {
callDecode(ctx, e.getChannel(), buf, replayable, e.getRemoteAddress());
} finally {
if (!buf.readable()) {
int readable = buf.readableBytes();
if (readable == 0) {
// nothing readable left so reset the state
cumulation = null;
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
} else {
// create a new buffer and copy the readable buffer into it
int cap = buf.capacity();
if (readable != cap && cap > getMaxCumulationBufferCapacity()) {
// the readable bytes are > as the configured
// copyThreshold, so create a new buffer and copy the
// bytes into it
cumulation = newCumulationBuffer(ctx, buf.readableBytes());
cumulation.writeBytes(buf);
replayable = new ReplayingDecoderBuffer(cumulation);
} else {
if (readable == cap) {
cumulation = buf;
} else {
// create a new cumulation buffer that holds the
// unwrapped parts of the CompositeChannelBuffer
// that are not read yet.
cumulation = ChannelBuffers.wrappedBuffer(((CompositeChannelBuffer) buf)
.decompose(buf.readerIndex(), buf.readableBytes())
.toArray(new ChannelBuffer[0]));
}
}
replayable = new ReplayingDecoderBuffer(cumulation);
}
}