Optimize FrameDecoder and ReplayingDecoder

* Overall code cleanup on FrameDecoder and ReplayingDecoder
* FrameDecoder discards readableBytes only when it has to
* Replaced createCumulationDynamicBuffer with newCumulationBuffer with
  an additional hint
* ReplayingDecoder does not perform memory copy if possible
This commit is contained in:
Trustin Lee 2012-02-28 16:30:00 -08:00
parent 2f6d02da60
commit c2d2f0b254
4 changed files with 91 additions and 81 deletions

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.frame; package io.netty.handler.codec.frame;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -74,11 +75,13 @@ public class FixedLengthFrameDecoder extends FrameDecoder {
} }
@Override @Override
protected ChannelBuffer createCumulationDynamicBuffer(ChannelHandlerContext ctx) { protected ChannelBuffer newCumulationBuffer(ChannelHandlerContext ctx, int minimumCapacity) {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
if (allocateFullBuffer) { if (allocateFullBuffer) {
return ChannelBuffers.dynamicBuffer(frameLength, ctx.getChannel().getConfig().getBufferFactory()); return ChannelBuffers.dynamicBuffer(
factory.getDefaultOrder(), frameLength, ctx.getChannel().getConfig().getBufferFactory());
} }
return super.createCumulationDynamicBuffer(ctx); return super.newCumulationBuffer(ctx, minimumCapacity);
} }
} }

View File

@ -18,6 +18,7 @@ package io.netty.handler.codec.frame;
import java.net.SocketAddress; import java.net.SocketAddress;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
@ -206,25 +207,22 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
if (input.readable()) { if (input.readable()) {
// seems like there is something readable left in the input buffer. So create the cumulation buffer and copy the input into it // seems like there is something readable left in the input buffer. So create the cumulation buffer and copy the input into it
ChannelBuffer cumulation = cumulation(ctx); (this.cumulation = newCumulationBuffer(ctx, input.readableBytes())).writeBytes(input);
cumulation.writeBytes(input);
} }
} else { } else {
ChannelBuffer cumulation = cumulation(ctx); ChannelBuffer cumulation = this.cumulation;
if (cumulation.readable()) { assert cumulation.readable();
if (cumulation.writableBytes() < input.readableBytes()) {
cumulation.discardReadBytes(); cumulation.discardReadBytes();
}
cumulation.writeBytes(input); cumulation.writeBytes(input);
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
} else { if (!cumulation.readable()) {
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); this.cumulation = null;
if (input.readable()) {
cumulation.writeBytes(input);
} }
} }
} }
}
@Override @Override
public void channelDisconnected( public void channelDisconnected(
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
@ -303,10 +301,6 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
unfoldAndFireMessageReceived(context, remoteAddress, frame); unfoldAndFireMessageReceived(context, remoteAddress, frame);
} }
if (!cumulation.readable()) {
this.cumulation = null;
}
} }
private void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) { private void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
@ -333,10 +327,10 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
ChannelBuffer cumulation = this.cumulation; ChannelBuffer cumulation = this.cumulation;
if (cumulation == null) { if (cumulation == null) {
return; return;
} else {
this.cumulation = null;
} }
this.cumulation = null;
if (cumulation.readable()) { if (cumulation.readable()) {
// Make sure all frames are read before notifying a closed channel. // Make sure all frames are read before notifying a closed channel.
callDecode(ctx, ctx.getChannel(), cumulation, null); callDecode(ctx, ctx.getChannel(), cumulation, null);
@ -355,28 +349,18 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
} }
/** /**
* Get the currently used {@link ChannelBuffer} for cumulation or create one in a lazy fashion if none exist yet * Create a new {@link ChannelBuffer} which is used for the cumulation.
* * Be aware that this MUST be a dynamic buffer. Sub-classes may override
* @param ctx the {@link ChannelHandlerContext} for this handler * this to provide a dynamic {@link ChannelBuffer} which has some
* @return buffer the {@link ChannelBuffer} which is used for cumulation * pre-allocated size that better fit their need.
*/
private ChannelBuffer cumulation(ChannelHandlerContext ctx) {
ChannelBuffer c = cumulation;
if (c == null) {
c = createCumulationDynamicBuffer(ctx);
cumulation = c;
}
return c;
}
/**
* Create a new {@link ChannelBuffer} which is used for the cumulation. Be aware that this MUST be a dynamic buffer. Sub-classes may override this to provide a
* dynamic {@link ChannelBuffer} which has some prelocated size that better fit their need.
* *
* @param ctx {@link ChannelHandlerContext} for this handler * @param ctx {@link ChannelHandlerContext} for this handler
* @return buffer the {@link ChannelBuffer} which is used for cumulation * @return buffer the {@link ChannelBuffer} which is used for cumulation
*/ */
protected ChannelBuffer createCumulationDynamicBuffer(ChannelHandlerContext ctx) { protected ChannelBuffer newCumulationBuffer(
return ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory()); ChannelHandlerContext ctx, int minimumCapacity) {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
return ChannelBuffers.dynamicBuffer(
factory.getDefaultOrder(), minimumCapacity, factory);
} }
} }

View File

@ -18,7 +18,6 @@ package io.netty.handler.codec.replay;
import java.net.SocketAddress; import java.net.SocketAddress;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
@ -289,7 +288,6 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
private ChannelBuffer cumulation; private ChannelBuffer cumulation;
private boolean needsCleanup;
private final boolean unfold; private final boolean unfold;
private ReplayingDecoderBuffer replayable; private ReplayingDecoderBuffer replayable;
private T state; private T state;
@ -430,11 +428,53 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
return; return;
} }
ChannelBuffer cumulation = cumulation(ctx); if (cumulation == null) {
needsCleanup = true; // the cumulation buffer is not created yet so just pass the input
cumulation.discardReadBytes(); // to callDecode(...) method
this.cumulation = input;
replayable = new ReplayingDecoderBuffer(input);
int oldReaderIndex = input.readerIndex();
int inputSize = input.readableBytes();
callDecode(
ctx, e.getChannel(),
input, replayable,
e.getRemoteAddress());
if (input.readable()) {
// seems like there is something readable left in the input buffer
// or decoder wants a replay - create the cumulation buffer and
// copy the input into it
if (checkpoint >= 0) {
ChannelBuffer cumulation = this.cumulation =
newCumulationBuffer(ctx, inputSize);
cumulation.writeBytes(input, oldReaderIndex, inputSize);
cumulation.readerIndex(input.readerIndex());
replayable = new ReplayingDecoderBuffer(cumulation);
} else {
System.out.println("B");
ChannelBuffer cumulation = this.cumulation =
newCumulationBuffer(ctx, input.readableBytes());
cumulation.writeBytes(input); cumulation.writeBytes(input);
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); replayable = new ReplayingDecoderBuffer(cumulation);
}
} else {
this.cumulation = null;
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
}
} else {
ChannelBuffer cumulation = this.cumulation;
assert cumulation.readable();
if (cumulation.writableBytes() < input.readableBytes()) {
cumulation.discardReadBytes();
}
cumulation.writeBytes(input);
callDecode(ctx, e.getChannel(), cumulation, replayable, e.getRemoteAddress());
if (!cumulation.readable()) {
this.cumulation = null;
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
}
}
} }
@Override @Override
@ -455,15 +495,15 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception { private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception {
while (cumulation.readable()) { while (input.readable()) {
int oldReaderIndex = checkpoint = cumulation.readerIndex(); int oldReaderIndex = checkpoint = input.readerIndex();
Object result = null; Object result = null;
T oldState = state; T oldState = state;
try { try {
result = decode(context, channel, replayable, state); result = decode(context, channel, replayableInput, state);
if (result == null) { if (result == null) {
if (oldReaderIndex == cumulation.readerIndex() && oldState == state) { if (oldReaderIndex == input.readerIndex() && oldState == state) {
throw new IllegalStateException( throw new IllegalStateException(
"null cannot be returned if no data is consumed and state didn't change."); "null cannot be returned if no data is consumed and state didn't change.");
} else { } else {
@ -476,7 +516,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
// Return to the checkpoint (or oldPosition) and retry. // Return to the checkpoint (or oldPosition) and retry.
int checkpoint = this.checkpoint; int checkpoint = this.checkpoint;
if (checkpoint >= 0) { if (checkpoint >= 0) {
cumulation.readerIndex(checkpoint); input.readerIndex(checkpoint);
} else { } else {
// Called by cleanup() - no need to maintain the readerIndex // Called by cleanup() - no need to maintain the readerIndex
// anymore because the buffer has been released already. // anymore because the buffer has been released already.
@ -489,7 +529,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
break; break;
} }
if (oldReaderIndex == cumulation.readerIndex() && oldState == state) { if (oldReaderIndex == input.readerIndex() && oldState == state) {
throw new IllegalStateException( throw new IllegalStateException(
"decode() method must consume at least one byte " + "decode() method must consume at least one byte " +
"if it returned a decoded message (caused by: " + "if it returned a decoded message (caused by: " +
@ -498,11 +538,6 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
// A successful decode // A successful decode
unfoldAndFireMessageReceived(context, result, remoteAddress); unfoldAndFireMessageReceived(context, result, remoteAddress);
if (!cumulation.readable()) {
this.cumulation = null;
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
}
} }
} }
@ -528,19 +563,17 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception { throws Exception {
try { try {
if (!needsCleanup) { ChannelBuffer cumulation = this.cumulation;
if (cumulation == null) {
return; return;
} else {
needsCleanup = false;
} }
ChannelBuffer cumulation = this.cumulation;
this.cumulation = null; this.cumulation = null;
replayable.terminate(); replayable.terminate();
if (cumulation != null && cumulation.readable()) { if (cumulation != null && cumulation.readable()) {
// Make sure all data was read before notifying a closed channel. // Make sure all data was read before notifying a closed channel.
callDecode(ctx, e.getChannel(), cumulation, null); callDecode(ctx, e.getChannel(), cumulation, replayable, null);
} }
// Call decodeLast() finally. Please note that decodeLast() is // Call decodeLast() finally. Please note that decodeLast() is
@ -558,19 +591,9 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
} }
} }
private ChannelBuffer cumulation(ChannelHandlerContext ctx) { private ChannelBuffer newCumulationBuffer(
ChannelBuffer buf = this.cumulation; ChannelHandlerContext ctx, int minimumCapacity) {
if (buf == null) { return new UnsafeDynamicChannelBuffer(
ctx.getChannel().getConfig().getBufferFactory(), minimumCapacity);
if (cumulation == null) {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
buf = new UnsafeDynamicChannelBuffer(factory);
cumulation = buf;
replayable = new ReplayingDecoderBuffer(buf);
} else {
buf = cumulation;
}
}
return buf;
} }
} }

View File

@ -20,8 +20,8 @@ import io.netty.buffer.DynamicChannelBuffer;
class UnsafeDynamicChannelBuffer extends DynamicChannelBuffer { class UnsafeDynamicChannelBuffer extends DynamicChannelBuffer {
UnsafeDynamicChannelBuffer(ChannelBufferFactory factory) { UnsafeDynamicChannelBuffer(ChannelBufferFactory factory, int minimumCapacity) {
super(factory.getDefaultOrder(), 256, factory); super(factory.getDefaultOrder(), minimumCapacity, factory);
} }
@Override @Override