diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/DelimiterBasedFrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/DelimiterBasedFrameDecoder.java index c79325bc8b..dbfc1ccc78 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/DelimiterBasedFrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/DelimiterBasedFrameDecoder.java @@ -179,12 +179,12 @@ public class DelimiterBasedFrameDecoder extends FrameDecoder { } if (stripDelimiter) { - frame = buffer.readBytes(minFrameLength); - buffer.skipBytes(minDelimLength); + frame = buffer.slice(buffer.readerIndex(), minFrameLength); } else { - frame = buffer.readBytes(minFrameLength + minDelimLength); + frame = buffer.slice(buffer.readerIndex(), minFrameLength + minDelimLength); } + buffer.skipBytes(minFrameLength + minDelimLength); return frame; } else { if (buffer.readableBytes() > maxFrameLength) { diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/FixedLengthFrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/FixedLengthFrameDecoder.java index 27f01c50f9..3fe7bd81a5 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/FixedLengthFrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/FixedLengthFrameDecoder.java @@ -63,7 +63,9 @@ public class FixedLengthFrameDecoder extends FrameDecoder { if (buffer.readableBytes() < frameLength) { return null; } else { - return buffer.readBytes(frameLength); + ChannelBuffer frame = buffer.slice(buffer.readerIndex(), frameLength); + buffer.skipBytes(frameLength); + return frame; } } } diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java index 4b76ff7756..ee7e3fdf22 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java @@ -16,7 +16,6 @@ package org.jboss.netty.handler.codec.frame; import java.net.SocketAddress; -import java.util.concurrent.atomic.AtomicReference; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; @@ -181,8 +180,8 @@ import org.jboss.netty.handler.codec.replay.ReplayingDecoder; public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { private final boolean unfold; - private final AtomicReference cumulation = - new AtomicReference(); + private ChannelBuffer cumulation = ChannelBuffers.EMPTY_BUFFER; + private boolean cleanedUp; protected FrameDecoder() { this(false); @@ -207,16 +206,17 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { return; } - ChannelBuffer cumulation = cumulation(ctx); + ChannelBuffer cumulation = this.cumulation; if (cumulation.readable()) { - cumulation.discardReadBytes(); - cumulation.writeBytes(input); - callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); + cumulation = ChannelBuffers.wrappedBuffer(cumulation, input); } else { - callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); - if (input.readable()) { - cumulation.writeBytes(input); - } + cumulation = input; + } + + try { + callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); + } finally { + this.cumulation = cumulation.slice(); } } @@ -321,10 +321,12 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { try { - ChannelBuffer cumulation = this.cumulation.getAndSet(null); - if (cumulation == null) { + if (cleanedUp) { return; } + cleanedUp = true; + ChannelBuffer cumulation = this.cumulation; + this.cumulation = ChannelBuffers.EMPTY_BUFFER; if (cumulation.readable()) { // Make sure all frames are read before notifying a closed channel. @@ -342,16 +344,4 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { ctx.sendUpstream(e); } } - - private ChannelBuffer cumulation(ChannelHandlerContext ctx) { - ChannelBuffer buf = cumulation.get(); - if (buf == null) { - buf = ChannelBuffers.dynamicBuffer( - ctx.getChannel().getConfig().getBufferFactory()); - if (!cumulation.compareAndSet(null, buf)) { - buf = cumulation.get(); - } - } - return buf; - } } diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/LengthFieldBasedFrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/LengthFieldBasedFrameDecoder.java index 7652ac5ec3..0988630b06 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/LengthFieldBasedFrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/LengthFieldBasedFrameDecoder.java @@ -363,7 +363,11 @@ public class LengthFieldBasedFrameDecoder extends FrameDecoder { "Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + initialBytesToStrip); } - buffer.skipBytes(initialBytesToStrip); - return buffer.readBytes(frameLengthInt - initialBytesToStrip); + + ChannelBuffer frame = buffer.slice( + buffer.readerIndex() + initialBytesToStrip, + frameLengthInt - initialBytesToStrip); + buffer.skipBytes(frameLengthInt); + return frame; } } diff --git a/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java index 57fbfe7782..722103343a 100644 --- a/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java @@ -55,7 +55,11 @@ public class ProtobufVarint32FrameDecoder extends FrameDecoder { break; } - return buffer.readBytes(messageSize); + // TODO Extract (ChannelBuffer.slice(int, int) + skipBytes(int)) + // into a new method in ChannelBuffer + ChannelBuffer frame = buffer.slice(buffer.readerIndex(), messageSize); + buffer.skipBytes(messageSize); + return frame; } } diff --git a/src/test/java/org/jboss/netty/handler/codec/frame/AbstractSocketFixedLengthEchoTest.java b/src/test/java/org/jboss/netty/handler/codec/frame/AbstractSocketFixedLengthEchoTest.java index d971fc452f..1d77ac9426 100644 --- a/src/test/java/org/jboss/netty/handler/codec/frame/AbstractSocketFixedLengthEchoTest.java +++ b/src/test/java/org/jboss/netty/handler/codec/frame/AbstractSocketFixedLengthEchoTest.java @@ -97,7 +97,7 @@ public abstract class AbstractSocketFixedLengthEchoTest { Channel cc = ccf.getChannel(); for (int i = 0; i < data.length;) { - int length = Math.min(random.nextInt(1024 * 64), data.length - i); + int length = Math.min(random.nextInt(1024 * 3), data.length - i); cc.write(ChannelBuffers.wrappedBuffer(data, i, length)); i += length; }