Make the cumulation usage more memory efficient. See #280

This commit is contained in:
Norman Maurer 2012-04-24 20:41:10 +02:00
parent 6e2e9fb3c5
commit 476cf97b97
3 changed files with 82 additions and 25 deletions

View File

@ -17,7 +17,6 @@ package io.netty.handler.codec.frame;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory; import io.netty.buffer.ChannelBufferFactory;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -78,8 +77,7 @@ public class FixedLengthFrameDecoder extends FrameDecoder {
protected ChannelBuffer newCumulationBuffer(ChannelHandlerContext ctx, int minimumCapacity) { protected ChannelBuffer newCumulationBuffer(ChannelHandlerContext ctx, int minimumCapacity) {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
if (allocateFullBuffer) { if (allocateFullBuffer) {
return ChannelBuffers.dynamicBuffer( return factory.getBuffer(frameLength);
factory.getDefaultOrder(), frameLength, ctx.getChannel().getConfig().getBufferFactory());
} }
return super.newCumulationBuffer(ctx, minimumCapacity); return super.newCumulationBuffer(ctx, minimumCapacity);
} }

View File

@ -210,15 +210,47 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
(this.cumulation = newCumulationBuffer(ctx, input.readableBytes())).writeBytes(input); (this.cumulation = newCumulationBuffer(ctx, input.readableBytes())).writeBytes(input);
} }
} else { } else {
ChannelBuffer cumulation = this.cumulation;
assert cumulation.readable(); assert cumulation.readable();
if (cumulation.writableBytes() < input.readableBytes()) { boolean fit = false;
cumulation.discardReadBytes();
int readable = input.readableBytes();
int writable = cumulation.writableBytes();
int w = writable - readable;
if (w < 0) {
int readerIndex = cumulation.readerIndex();
if (w + readerIndex >= 0) {
// the input will fit if we discard all read bytes, so do it
cumulation.discardReadBytes();
fit = true;
}
} else {
// ok the input fit into the cumulation buffer
fit = true;
} }
cumulation.writeBytes(input);
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
if (!cumulation.readable()) { ChannelBuffer buf;
if (fit) {
// the input fit in the cumulation buffer so copy it over
buf = this.cumulation;
buf.writeBytes(input);
} else {
// wrap the cumulation and input
buf = ChannelBuffers.wrappedBuffer(cumulation, input);
this.cumulation = buf;
}
callDecode(ctx, e.getChannel(), buf, e.getRemoteAddress());
if (!buf.readable()) {
// nothing readable left so reset the state
this.cumulation = null; this.cumulation = null;
} else {
// create a new buffer and copy the readable buffer into it
this.cumulation = newCumulationBuffer(ctx, buf.readableBytes());
this.cumulation.writeBytes(buf);
} }
} }
} }
@ -350,9 +382,7 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
/** /**
* Create a new {@link ChannelBuffer} which is used for the cumulation. * Create a new {@link ChannelBuffer} which is used for the cumulation.
* Be aware that this MUST be a dynamic buffer. Sub-classes may override * Sub-classes may override this.
* this to provide a dynamic {@link ChannelBuffer} which has some
* pre-allocated size that better fit their need.
* *
* @param ctx {@link ChannelHandlerContext} for this handler * @param ctx {@link ChannelHandlerContext} for this handler
* @return buffer the {@link ChannelBuffer} which is used for cumulation * @return buffer the {@link ChannelBuffer} which is used for cumulation
@ -360,7 +390,6 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
protected ChannelBuffer newCumulationBuffer( protected ChannelBuffer newCumulationBuffer(
ChannelHandlerContext ctx, int minimumCapacity) { ChannelHandlerContext ctx, int minimumCapacity) {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
return ChannelBuffers.dynamicBuffer( return factory.getBuffer(Math.max(minimumCapacity, 256));
factory.getDefaultOrder(), Math.max(minimumCapacity, 256), factory);
} }
} }

View File

@ -473,16 +473,49 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER; replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
} }
} else { } else {
ChannelBuffer cumulation = this.cumulation;
assert cumulation.readable(); assert cumulation.readable();
if (cumulation.writableBytes() < input.readableBytes()) { boolean fit = false;
cumulation.discardReadBytes();
int readable = input.readableBytes();
int writable = cumulation.writableBytes();
int w = writable - readable;
if (w < 0) {
int readerIndex = cumulation.readerIndex();
if (w + readerIndex >= 0) {
// the input will fit if we discard all read bytes, so do it
cumulation.discardReadBytes();
fit = true;
}
} else {
// ok the input fit into the cumulation buffer
fit = true;
} }
cumulation.writeBytes(input);
callDecode(ctx, e.getChannel(), cumulation, replayable, e.getRemoteAddress()); ChannelBuffer buf;
if (!cumulation.readable()) { if (fit) {
// the input fit in the cumulation buffer so copy it over
buf = this.cumulation;
buf.writeBytes(input);
} else {
// wrap the cumulation and input
buf = ChannelBuffers.wrappedBuffer(cumulation, input);
this.cumulation = buf;
replayable = new ReplayingDecoderBuffer(cumulation);
}
callDecode(ctx, e.getChannel(), buf, replayable, e.getRemoteAddress());
if (!buf.readable()) {
// nothing readable left so reset the state
this.cumulation = null; this.cumulation = null;
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER; replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
} else {
// create a new buffer and copy the readable buffer into it
this.cumulation = newCumulationBuffer(ctx, buf.readableBytes());
this.cumulation.writeBytes(buf);
replayable = new ReplayingDecoderBuffer(this.cumulation);
} }
} }
} }
@ -605,9 +638,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
/** /**
* Create a new {@link ChannelBuffer} which is used for the cumulation. * Create a new {@link ChannelBuffer} which is used for the cumulation.
* Be aware that this MUST be a dynamic buffer. Sub-classes may override * Sub-classes may override this.
* this to provide a dynamic {@link ChannelBuffer} which has some
* pre-allocated size that better fit their need.
* *
* @param ctx {@link ChannelHandlerContext} for this handler * @param ctx {@link ChannelHandlerContext} for this handler
* @return buffer the {@link ChannelBuffer} which is used for cumulation * @return buffer the {@link ChannelBuffer} which is used for cumulation
@ -615,7 +646,6 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
protected ChannelBuffer newCumulationBuffer( protected ChannelBuffer newCumulationBuffer(
ChannelHandlerContext ctx, int minimumCapacity) { ChannelHandlerContext ctx, int minimumCapacity) {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
return ChannelBuffers.dynamicBuffer( return factory.getBuffer(Math.max(minimumCapacity, 256));
factory.getDefaultOrder(), Math.max(minimumCapacity, 256), factory);
} }
} }