From 8eeb72cbbf79496d317462a8bdf63e461ba42717 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 17 Nov 2009 15:46:34 +0000 Subject: [PATCH] Reverted the previous optimization because it might perform very bad as the number of components in the composite buffer increases --- .../frame/DelimiterBasedFrameDecoder.java | 6 +-- .../codec/frame/FixedLengthFrameDecoder.java | 4 +- .../handler/codec/frame/FrameDecoder.java | 40 ++++++++++++------- .../frame/LengthFieldBasedFrameDecoder.java | 8 +--- .../ProtobufVarint32FrameDecoder.java | 6 +-- 5 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/DelimiterBasedFrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/DelimiterBasedFrameDecoder.java index dbfc1ccc78..c79325bc8b 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/DelimiterBasedFrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/DelimiterBasedFrameDecoder.java @@ -179,12 +179,12 @@ public class DelimiterBasedFrameDecoder extends FrameDecoder { } if (stripDelimiter) { - frame = buffer.slice(buffer.readerIndex(), minFrameLength); + frame = buffer.readBytes(minFrameLength); + buffer.skipBytes(minDelimLength); } else { - frame = buffer.slice(buffer.readerIndex(), minFrameLength + minDelimLength); + frame = buffer.readBytes(minFrameLength + minDelimLength); } - buffer.skipBytes(minFrameLength + minDelimLength); return frame; } else { if (buffer.readableBytes() > maxFrameLength) { diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/FixedLengthFrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/FixedLengthFrameDecoder.java index 3fe7bd81a5..27f01c50f9 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/FixedLengthFrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/FixedLengthFrameDecoder.java @@ -63,9 +63,7 @@ public class FixedLengthFrameDecoder extends FrameDecoder { if (buffer.readableBytes() < frameLength) { return null; } else { - ChannelBuffer frame = buffer.slice(buffer.readerIndex(), frameLength); - buffer.skipBytes(frameLength); - return frame; + return buffer.readBytes(frameLength); } } } diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java index ee7e3fdf22..4b76ff7756 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java @@ -16,6 +16,7 @@ package org.jboss.netty.handler.codec.frame; import java.net.SocketAddress; +import java.util.concurrent.atomic.AtomicReference; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; @@ -180,8 +181,8 @@ import org.jboss.netty.handler.codec.replay.ReplayingDecoder; public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { private final boolean unfold; - private ChannelBuffer cumulation = ChannelBuffers.EMPTY_BUFFER; - private boolean cleanedUp; + private final AtomicReference cumulation = + new AtomicReference(); protected FrameDecoder() { this(false); @@ -206,17 +207,16 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { return; } - ChannelBuffer cumulation = this.cumulation; + ChannelBuffer cumulation = cumulation(ctx); if (cumulation.readable()) { - cumulation = ChannelBuffers.wrappedBuffer(cumulation, input); - } else { - cumulation = input; - } - - try { + cumulation.discardReadBytes(); + cumulation.writeBytes(input); callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); - } finally { - this.cumulation = cumulation.slice(); + } else { + callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); + if (input.readable()) { + cumulation.writeBytes(input); + } } } @@ -321,12 +321,10 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { try { - if (cleanedUp) { + ChannelBuffer cumulation = this.cumulation.getAndSet(null); + if (cumulation == null) { return; } - cleanedUp = true; - ChannelBuffer cumulation = this.cumulation; - this.cumulation = ChannelBuffers.EMPTY_BUFFER; if (cumulation.readable()) { // Make sure all frames are read before notifying a closed channel. @@ -344,4 +342,16 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { ctx.sendUpstream(e); } } + + private ChannelBuffer cumulation(ChannelHandlerContext ctx) { + ChannelBuffer buf = cumulation.get(); + if (buf == null) { + buf = ChannelBuffers.dynamicBuffer( + ctx.getChannel().getConfig().getBufferFactory()); + if (!cumulation.compareAndSet(null, buf)) { + buf = cumulation.get(); + } + } + return buf; + } } diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/LengthFieldBasedFrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/LengthFieldBasedFrameDecoder.java index 0988630b06..7652ac5ec3 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/LengthFieldBasedFrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/LengthFieldBasedFrameDecoder.java @@ -363,11 +363,7 @@ public class LengthFieldBasedFrameDecoder extends FrameDecoder { "Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + initialBytesToStrip); } - - ChannelBuffer frame = buffer.slice( - buffer.readerIndex() + initialBytesToStrip, - frameLengthInt - initialBytesToStrip); - buffer.skipBytes(frameLengthInt); - return frame; + buffer.skipBytes(initialBytesToStrip); + return buffer.readBytes(frameLengthInt - initialBytesToStrip); } } diff --git a/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java index 722103343a..57fbfe7782 100644 --- a/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java @@ -55,11 +55,7 @@ public class ProtobufVarint32FrameDecoder extends FrameDecoder { break; } - // TODO Extract (ChannelBuffer.slice(int, int) + skipBytes(int)) - // into a new method in ChannelBuffer - ChannelBuffer frame = buffer.slice(buffer.readerIndex(), messageSize); - buffer.skipBytes(messageSize); - return frame; + return buffer.readBytes(messageSize); } }