Reverted the previous optimization because it might perform very bad as the number of components in the composite buffer increases
This commit is contained in:
parent
902ab48820
commit
8eeb72cbbf
@ -179,12 +179,12 @@ public class DelimiterBasedFrameDecoder extends FrameDecoder {
|
||||
}
|
||||
|
||||
if (stripDelimiter) {
|
||||
frame = buffer.slice(buffer.readerIndex(), minFrameLength);
|
||||
frame = buffer.readBytes(minFrameLength);
|
||||
buffer.skipBytes(minDelimLength);
|
||||
} else {
|
||||
frame = buffer.slice(buffer.readerIndex(), minFrameLength + minDelimLength);
|
||||
frame = buffer.readBytes(minFrameLength + minDelimLength);
|
||||
}
|
||||
|
||||
buffer.skipBytes(minFrameLength + minDelimLength);
|
||||
return frame;
|
||||
} else {
|
||||
if (buffer.readableBytes() > maxFrameLength) {
|
||||
|
@ -63,9 +63,7 @@ public class FixedLengthFrameDecoder extends FrameDecoder {
|
||||
if (buffer.readableBytes() < frameLength) {
|
||||
return null;
|
||||
} else {
|
||||
ChannelBuffer frame = buffer.slice(buffer.readerIndex(), frameLength);
|
||||
buffer.skipBytes(frameLength);
|
||||
return frame;
|
||||
return buffer.readBytes(frameLength);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
package org.jboss.netty.handler.codec.frame;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
@ -180,8 +181,8 @@ import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
|
||||
public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
|
||||
|
||||
private final boolean unfold;
|
||||
private ChannelBuffer cumulation = ChannelBuffers.EMPTY_BUFFER;
|
||||
private boolean cleanedUp;
|
||||
private final AtomicReference<ChannelBuffer> cumulation =
|
||||
new AtomicReference<ChannelBuffer>();
|
||||
|
||||
protected FrameDecoder() {
|
||||
this(false);
|
||||
@ -206,17 +207,16 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
|
||||
return;
|
||||
}
|
||||
|
||||
ChannelBuffer cumulation = this.cumulation;
|
||||
ChannelBuffer cumulation = cumulation(ctx);
|
||||
if (cumulation.readable()) {
|
||||
cumulation = ChannelBuffers.wrappedBuffer(cumulation, input);
|
||||
} else {
|
||||
cumulation = input;
|
||||
}
|
||||
|
||||
try {
|
||||
cumulation.discardReadBytes();
|
||||
cumulation.writeBytes(input);
|
||||
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
|
||||
} finally {
|
||||
this.cumulation = cumulation.slice();
|
||||
} else {
|
||||
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
|
||||
if (input.readable()) {
|
||||
cumulation.writeBytes(input);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -321,12 +321,10 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
|
||||
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
|
||||
throws Exception {
|
||||
try {
|
||||
if (cleanedUp) {
|
||||
ChannelBuffer cumulation = this.cumulation.getAndSet(null);
|
||||
if (cumulation == null) {
|
||||
return;
|
||||
}
|
||||
cleanedUp = true;
|
||||
ChannelBuffer cumulation = this.cumulation;
|
||||
this.cumulation = ChannelBuffers.EMPTY_BUFFER;
|
||||
|
||||
if (cumulation.readable()) {
|
||||
// Make sure all frames are read before notifying a closed channel.
|
||||
@ -344,4 +342,16 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
|
||||
ctx.sendUpstream(e);
|
||||
}
|
||||
}
|
||||
|
||||
private ChannelBuffer cumulation(ChannelHandlerContext ctx) {
|
||||
ChannelBuffer buf = cumulation.get();
|
||||
if (buf == null) {
|
||||
buf = ChannelBuffers.dynamicBuffer(
|
||||
ctx.getChannel().getConfig().getBufferFactory());
|
||||
if (!cumulation.compareAndSet(null, buf)) {
|
||||
buf = cumulation.get();
|
||||
}
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
}
|
||||
|
@ -363,11 +363,7 @@ public class LengthFieldBasedFrameDecoder extends FrameDecoder {
|
||||
"Adjusted frame length (" + frameLength + ") is less " +
|
||||
"than initialBytesToStrip: " + initialBytesToStrip);
|
||||
}
|
||||
|
||||
ChannelBuffer frame = buffer.slice(
|
||||
buffer.readerIndex() + initialBytesToStrip,
|
||||
frameLengthInt - initialBytesToStrip);
|
||||
buffer.skipBytes(frameLengthInt);
|
||||
return frame;
|
||||
buffer.skipBytes(initialBytesToStrip);
|
||||
return buffer.readBytes(frameLengthInt - initialBytesToStrip);
|
||||
}
|
||||
}
|
||||
|
@ -55,11 +55,7 @@ public class ProtobufVarint32FrameDecoder extends FrameDecoder {
|
||||
break;
|
||||
}
|
||||
|
||||
// TODO Extract (ChannelBuffer.slice(int, int) + skipBytes(int))
|
||||
// into a new method in ChannelBuffer
|
||||
ChannelBuffer frame = buffer.slice(buffer.readerIndex(), messageSize);
|
||||
buffer.skipBytes(messageSize);
|
||||
return frame;
|
||||
return buffer.readBytes(messageSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user