* Rewrote FrameDecoder by utilizing the latest improvement in CompositeChannelBuffer

** A user doesn't need to make a copy of the cumulative buffer anymore.
*** Modified all FrameDecoder subtypes to use slice() instead of readBytes()
* Reduced the maximum length of the random writes in AbstractSocketFixedLengthEchoTest to increase the probability of composite buffer occurances
This commit is contained in:
Trustin Lee 2009-11-17 15:11:31 +00:00
parent 96bbbbb7f1
commit 902ab48820
6 changed files with 33 additions and 33 deletions

View File

@ -179,12 +179,12 @@ public class DelimiterBasedFrameDecoder extends FrameDecoder {
} }
if (stripDelimiter) { if (stripDelimiter) {
frame = buffer.readBytes(minFrameLength); frame = buffer.slice(buffer.readerIndex(), minFrameLength);
buffer.skipBytes(minDelimLength);
} else { } else {
frame = buffer.readBytes(minFrameLength + minDelimLength); frame = buffer.slice(buffer.readerIndex(), minFrameLength + minDelimLength);
} }
buffer.skipBytes(minFrameLength + minDelimLength);
return frame; return frame;
} else { } else {
if (buffer.readableBytes() > maxFrameLength) { if (buffer.readableBytes() > maxFrameLength) {

View File

@ -63,7 +63,9 @@ public class FixedLengthFrameDecoder extends FrameDecoder {
if (buffer.readableBytes() < frameLength) { if (buffer.readableBytes() < frameLength) {
return null; return null;
} else { } else {
return buffer.readBytes(frameLength); ChannelBuffer frame = buffer.slice(buffer.readerIndex(), frameLength);
buffer.skipBytes(frameLength);
return frame;
} }
} }
} }

View File

@ -16,7 +16,6 @@
package org.jboss.netty.handler.codec.frame; package org.jboss.netty.handler.codec.frame;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicReference;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffers;
@ -181,8 +180,8 @@ import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
private final boolean unfold; private final boolean unfold;
private final AtomicReference<ChannelBuffer> cumulation = private ChannelBuffer cumulation = ChannelBuffers.EMPTY_BUFFER;
new AtomicReference<ChannelBuffer>(); private boolean cleanedUp;
protected FrameDecoder() { protected FrameDecoder() {
this(false); this(false);
@ -207,16 +206,17 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
return; return;
} }
ChannelBuffer cumulation = cumulation(ctx); ChannelBuffer cumulation = this.cumulation;
if (cumulation.readable()) { if (cumulation.readable()) {
cumulation.discardReadBytes(); cumulation = ChannelBuffers.wrappedBuffer(cumulation, input);
cumulation.writeBytes(input);
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
} else { } else {
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); cumulation = input;
if (input.readable()) { }
cumulation.writeBytes(input);
} try {
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
} finally {
this.cumulation = cumulation.slice();
} }
} }
@ -321,10 +321,12 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception { throws Exception {
try { try {
ChannelBuffer cumulation = this.cumulation.getAndSet(null); if (cleanedUp) {
if (cumulation == null) {
return; return;
} }
cleanedUp = true;
ChannelBuffer cumulation = this.cumulation;
this.cumulation = ChannelBuffers.EMPTY_BUFFER;
if (cumulation.readable()) { if (cumulation.readable()) {
// Make sure all frames are read before notifying a closed channel. // Make sure all frames are read before notifying a closed channel.
@ -342,16 +344,4 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
} }
private ChannelBuffer cumulation(ChannelHandlerContext ctx) {
ChannelBuffer buf = cumulation.get();
if (buf == null) {
buf = ChannelBuffers.dynamicBuffer(
ctx.getChannel().getConfig().getBufferFactory());
if (!cumulation.compareAndSet(null, buf)) {
buf = cumulation.get();
}
}
return buf;
}
} }

View File

@ -363,7 +363,11 @@ public class LengthFieldBasedFrameDecoder extends FrameDecoder {
"Adjusted frame length (" + frameLength + ") is less " + "Adjusted frame length (" + frameLength + ") is less " +
"than initialBytesToStrip: " + initialBytesToStrip); "than initialBytesToStrip: " + initialBytesToStrip);
} }
buffer.skipBytes(initialBytesToStrip);
return buffer.readBytes(frameLengthInt - initialBytesToStrip); ChannelBuffer frame = buffer.slice(
buffer.readerIndex() + initialBytesToStrip,
frameLengthInt - initialBytesToStrip);
buffer.skipBytes(frameLengthInt);
return frame;
} }
} }

View File

@ -55,7 +55,11 @@ public class ProtobufVarint32FrameDecoder extends FrameDecoder {
break; break;
} }
return buffer.readBytes(messageSize); // TODO Extract (ChannelBuffer.slice(int, int) + skipBytes(int))
// into a new method in ChannelBuffer
ChannelBuffer frame = buffer.slice(buffer.readerIndex(), messageSize);
buffer.skipBytes(messageSize);
return frame;
} }
} }

View File

@ -97,7 +97,7 @@ public abstract class AbstractSocketFixedLengthEchoTest {
Channel cc = ccf.getChannel(); Channel cc = ccf.getChannel();
for (int i = 0; i < data.length;) { for (int i = 0; i < data.length;) {
int length = Math.min(random.nextInt(1024 * 64), data.length - i); int length = Math.min(random.nextInt(1024 * 3), data.length - i);
cc.write(ChannelBuffers.wrappedBuffer(data, i, length)); cc.write(ChannelBuffers.wrappedBuffer(data, i, length));
i += length; i += length;
} }