From 641e5368d02f08242adaefb7f547d6a3bba6050f Mon Sep 17 00:00:00 2001 From: Gerd Behrmann Date: Mon, 11 Jun 2012 17:11:59 +0200 Subject: [PATCH 1/3] Add ZeroCopyFrameDecoder Copy of FrameDecoder that avoids copying partial frames to a cumulation buffer. Instead buffers are kept on a list and composed to a CompositeChannelBuffer when needed. CompositeChannelBuffers's decomposite method conveniently allows buffers from which all data was read to be discarded. The documented interface stays compatible with FrameDecoder, however undocumented behaviour used by ReplyDecoder and other subclasses has changed. For this reason a new class was introduced rather than modifying the existing FrameDecoder. --- .../codec/frame/ZeroCopyFrameDecoder.java | 464 ++++++++++++++++++ 1 file changed, 464 insertions(+) create mode 100644 src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java new file mode 100644 index 0000000000..b35b928e8a --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java @@ -0,0 +1,464 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.codec.frame; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.buffer.CompositeChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandler; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.LifeCycleAwareChannelHandler; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; + +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; + +/** + * Decodes the received {@link org.jboss.netty.buffer.ChannelBuffer}s into a meaningful frame object. + *

+ * In a stream-based transport such as TCP/IP, packets can be fragmented and + * reassembled during transmission even in a LAN environment. For example, + * let us assume you have received three packets: + *

+ * +-----+-----+-----+
+ * | ABC | DEF | GHI |
+ * +-----+-----+-----+
+ * 
+ * because of the packet fragmentation, a server can receive them like the + * following: + *
+ * +----+-------+---+---+
+ * | AB | CDEFG | H | I |
+ * +----+-------+---+---+
+ * 
+ *

+ * {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} helps you defrag the received packets into one or more + * meaningful frames that could be easily understood by the + * application logic. In case of the example above, your {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} + * implementation could defrag the received packets like the following: + *

+ * +-----+-----+-----+
+ * | ABC | DEF | GHI |
+ * +-----+-----+-----+
+ * 
+ *

+ * The following code shows an example handler which decodes a frame whose + * first 4 bytes header represents the length of the frame, excluding the + * header. + *

+ * MESSAGE FORMAT
+ * ==============
+ *
+ * Offset:  0        4                   (Length + 4)
+ *          +--------+------------------------+
+ * Fields:  | Length | Actual message content |
+ *          +--------+------------------------+
+ *
+ * DECODER IMPLEMENTATION
+ * ======================
+ *
+ * public class IntegerHeaderFrameDecoder extends {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} {
+ *
+ *   {@code @Override}
+ *   protected Object decode({@link org.jboss.netty.channel.ChannelHandlerContext} ctx,
+ *                           {@link org.jboss.netty.channel.Channel channel},
+ *                           {@link org.jboss.netty.buffer.ChannelBuffer} buf) throws Exception {
+ *
+ *     // Make sure if the length field was received.
+ *     if (buf.readableBytes() < 4) {
+ *        // The length field was not received yet - return null.
+ *        // This method will be invoked again when more packets are
+ *        // received and appended to the buffer.
+ *        return null;
+ *     }
+ *
+ *     // The length field is in the buffer.
+ *
+ *     // Mark the current buffer position before reading the length field
+ *     // because the whole frame might not be in the buffer yet.
+ *     // We will reset the buffer position to the marked position if
+ *     // there's not enough bytes in the buffer.
+ *     buf.markReaderIndex();
+ *
+ *     // Read the length field.
+ *     int length = buf.readInt();
+ *
+ *     // Make sure if there's enough bytes in the buffer.
+ *     if (buf.readableBytes() < length) {
+ *        // The whole bytes were not received yet - return null.
+ *        // This method will be invoked again when more packets are
+ *        // received and appended to the buffer.
+ *
+ *        // Reset to the marked position to read the length field again
+ *        // next time.
+ *        buf.resetReaderIndex();
+ *
+ *        return null;
+ *     }
+ *
+ *     // There's enough bytes in the buffer. Read it.
+ *     {@link org.jboss.netty.buffer.ChannelBuffer} frame = buf.readBytes(length);
+ *
+ *     // Successfully decoded a frame.  Return the decoded frame.
+ *     return frame;
+ *   }
+ * }
+ * 
+ * + *

Returning a POJO rather than a {@link org.jboss.netty.buffer.ChannelBuffer}

+ *

+ * Please note that you can return an object of a different type than + * {@link org.jboss.netty.buffer.ChannelBuffer} in your {@code decode()} and {@code decodeLast()} + * implementation. For example, you could return a + * POJO so that the next + * {@link org.jboss.netty.channel.ChannelUpstreamHandler} receives a {@link org.jboss.netty.channel.MessageEvent} which + * contains a POJO rather than a {@link org.jboss.netty.buffer.ChannelBuffer}. + * + *

Replacing a decoder with another decoder in a pipeline

+ *

+ * If you are going to write a protocol multiplexer, you will probably want to + * replace a {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} (protocol detector) with another + * {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} or {@link org.jboss.netty.handler.codec.replay.ReplayingDecoder} (actual protocol decoder). + * It is not possible to achieve this simply by calling + * {@link org.jboss.netty.channel.ChannelPipeline#replace(org.jboss.netty.channel.ChannelHandler, String, org.jboss.netty.channel.ChannelHandler)}, but + * some additional steps are required: + *

+ * public class FirstDecoder extends {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} {
+ *
+ *     public FirstDecoder() {
+ *         super(true); // Enable unfold
+ *     }
+ *
+ *     {@code @Override}
+ *     protected Object decode({@link org.jboss.netty.channel.ChannelHandlerContext} ctx,
+ *                             {@link org.jboss.netty.channel.Channel} channel,
+ *                             {@link org.jboss.netty.buffer.ChannelBuffer} buf) {
+ *         ...
+ *         // Decode the first message
+ *         Object firstMessage = ...;
+ *
+ *         // Add the second decoder
+ *         ctx.getPipeline().addLast("second", new SecondDecoder());
+ *
+ *         // Remove the first decoder (me)
+ *         ctx.getPipeline().remove(this);
+ *
+ *         if (buf.readable()) {
+ *             // Hand off the remaining data to the second decoder
+ *             return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) };
+ *         } else {
+ *             // Nothing to hand off
+ *             return firstMessage;
+ *         }
+ *     }
+ * }
+ * 
+ * + * @apiviz.landmark + */ +public abstract class ZeroCopyFrameDecoder extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler { + + private final boolean unfold; + protected List cumulation; + private volatile ChannelHandlerContext ctx; + + protected ZeroCopyFrameDecoder() { + this(false); + } + + protected ZeroCopyFrameDecoder(boolean unfold) { + this.unfold = unfold; + } + + @Override + public void messageReceived( + ChannelHandlerContext ctx, MessageEvent e) throws Exception { + + Object m = e.getMessage(); + if (!(m instanceof ChannelBuffer)) { + ctx.sendUpstream(e); + return; + } + + ChannelBuffer input = (ChannelBuffer) m; + if (!input.readable()) { + return; + } + + if (cumulation == null) { + // Wrap in try / finally. + // + // See https://github.com/netty/netty/issues/364 + try { + // the cumulation buffer is not created yet so just pass the input to callDecode(...) method + callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); + } finally { + if (input.readable()) { + // seems like there is something readable left in the input buffer. So create + // the cumulation buffer and copy the input into it + cumulation = new ArrayList(2); + cumulation.add(input); + } + } + + } else { + if (!cumulation.get(0).order().equals(input.order())) { + throw new IllegalArgumentException( + "inconsistent byte order"); + } + + cumulation.add(input); + + CompositeChannelBuffer buf = + new CompositeChannelBuffer(input.order(), cumulation, false); + + // Wrap in try / finally. + // + // See https://github.com/netty/netty/issues/364 + try { + callDecode(ctx, e.getChannel(), buf, e.getRemoteAddress()); + } finally { + if (!buf.readable()) { + // nothing readable left so reset the state + cumulation = null; + } else { + cumulation = buf.decompose(buf.readerIndex(), buf.readableBytes()); + } + } + + } + } + + @Override + public void channelDisconnected( + ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + cleanup(ctx, e); + } + + @Override + public void channelClosed( + ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + cleanup(ctx, e); + } + + @Override + public void exceptionCaught( + ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { + ctx.sendUpstream(e); + } + + /** + * Decodes the received packets so far into a frame. + * + * @param ctx the context of this handler + * @param channel the current channel + * @param buffer the cumulative buffer of received packets so far. + * Note that the buffer might be empty, which means you + * should not make an assumption that the buffer contains + * at least one byte in your decoder implementation. + * + * @return the decoded frame if a full frame was received and decoded. + * {@code null} if there's not enough data in the buffer to decode a frame. + */ + protected abstract Object decode( + ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception; + + /** + * Decodes the received data so far into a frame when the channel is + * disconnected. + * + * @param ctx the context of this handler + * @param channel the current channel + * @param buffer the cumulative buffer of received packets so far. + * Note that the buffer might be empty, which means you + * should not make an assumption that the buffer contains + * at least one byte in your decoder implementation. + * + * @return the decoded frame if a full frame was received and decoded. + * {@code null} if there's not enough data in the buffer to decode a frame. + */ + protected Object decodeLast( + ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { + return decode(ctx, channel, buffer); + } + + private void callDecode( + ChannelHandlerContext context, Channel channel, + ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception { + + while (cumulation.readable()) { + int oldReaderIndex = cumulation.readerIndex(); + Object frame = decode(context, channel, cumulation); + if (frame == null) { + if (oldReaderIndex == cumulation.readerIndex()) { + // Seems like more data is required. + // Let us wait for the next notification. + break; + } else { + // Previous data has been discarded. + // Probably it is reading on. + continue; + } + } else if (oldReaderIndex == cumulation.readerIndex()) { + throw new IllegalStateException( + "decode() method must read at least one byte " + + "if it returned a frame (caused by: " + getClass() + ")"); + } + + unfoldAndFireMessageReceived(context, remoteAddress, frame); + } + } + + protected final void unfoldAndFireMessageReceived( + ChannelHandlerContext context, SocketAddress remoteAddress, Object result) { + if (unfold) { + if (result instanceof Object[]) { + for (Object r: (Object[]) result) { + Channels.fireMessageReceived(context, r, remoteAddress); + } + } else if (result instanceof Iterable) { + for (Object r: (Iterable) result) { + Channels.fireMessageReceived(context, r, remoteAddress); + } + } else { + Channels.fireMessageReceived(context, result, remoteAddress); + } + } else { + Channels.fireMessageReceived(context, result, remoteAddress); + } + } + + /** + * Gets called on {@link #channelDisconnected(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelStateEvent)} and + * {@link #channelClosed(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelStateEvent)} + */ + protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) + throws Exception { + try { + List cumulation = this.cumulation; + if (cumulation == null) { + return; + } + + this.cumulation = null; + + CompositeChannelBuffer buf = + new CompositeChannelBuffer(cumulation.get(0).order(), cumulation, false); + + // Make sure all frames are read before notifying a closed channel. + callDecode(ctx, ctx.getChannel(), buf, null); + + // Call decodeLast() finally. Please note that decodeLast() is + // called even if there's nothing more to read from the buffer to + // notify a user that the connection was closed explicitly. + Object partialFrame = decodeLast(ctx, ctx.getChannel(), buf); + if (partialFrame != null) { + unfoldAndFireMessageReceived(ctx, null, partialFrame); + } + } finally { + ctx.sendUpstream(e); + } + } + + /** + * Create a new {@link org.jboss.netty.buffer.ChannelBuffer} which is used for the cumulation. + * Sub-classes may override this. + * + * @param ctx {@link org.jboss.netty.channel.ChannelHandlerContext} for this handler + * @return buffer the {@link org.jboss.netty.buffer.ChannelBuffer} which is used for cumulation + */ + @Deprecated + protected ChannelBuffer newCumulationBuffer( + ChannelHandlerContext ctx, int minimumCapacity) { + return null; + } + + /** + * Replace this {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} in the {@link org.jboss.netty.channel.ChannelPipeline} with the given {@link org.jboss.netty.channel.ChannelHandler}. All + * remaining bytes in the {@link org.jboss.netty.buffer.ChannelBuffer} will get send to the new {@link org.jboss.netty.channel.ChannelHandler} that was used + * as replacement + * + */ + public void replace(String handlerName, ChannelHandler handler) { + if (ctx == null) { + throw new IllegalStateException( + "Replace cann only be called once the FrameDecoder is added to the ChannelPipeline"); + } + ChannelPipeline pipeline = ctx.getPipeline(); + pipeline.addAfter(ctx.getName(), handlerName, handler); + + try { + if (cumulation != null) { + CompositeChannelBuffer buf = + new CompositeChannelBuffer(cumulation.get(0).order(), cumulation, false); + Channels.fireMessageReceived(ctx, buf); + cumulation = null; + } + } finally { + pipeline.remove(this); + } + + } + + /** + * Returns the actual number of readable bytes in the internal cumulative + * buffer of this decoder. You usually do not need to rely on this value + * to write a decoder. Use it only when you muse use it at your own risk. + * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. + */ + protected int actualReadableBytes() { + return internalBuffer().readableBytes(); + } + + + + /** + * Returns the internal cumulative buffer of this decoder. You usually + * do not need to access the internal buffer directly to write a decoder. + * Use it only when you must use it at your own risk. + */ + protected ChannelBuffer internalBuffer() { + List buf = cumulation; + if (buf == null) { + return ChannelBuffers.EMPTY_BUFFER; + } + return new CompositeChannelBuffer(cumulation.get(0).order(), cumulation, false); + } + + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; + } + + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // Nothing to do.. + } + + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + // Nothing to do.. + } + + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // Nothing to do.. + } + +} From 0086eb3e1d5640bf41be1301a964ddd65ca93ccb Mon Sep 17 00:00:00 2001 From: Gerd Behrmann Date: Tue, 19 Jun 2012 19:19:50 +0200 Subject: [PATCH 2/3] ZeroCopyFrameDecoder: Follow checkstyle rules --- .../codec/frame/ZeroCopyFrameDecoder.java | 66 ++++++++++--------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java index b35b928e8a..500f489c01 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java @@ -15,7 +15,12 @@ */ package org.jboss.netty.handler.codec.frame; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; + import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBufferFactory; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.CompositeChannelBuffer; import org.jboss.netty.channel.Channel; @@ -23,18 +28,16 @@ import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.LifeCycleAwareChannelHandler; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; - -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.List; +import org.jboss.netty.handler.codec.replay.ReplayingDecoder; /** - * Decodes the received {@link org.jboss.netty.buffer.ChannelBuffer}s into a meaningful frame object. + * Decodes the received {@link ChannelBuffer}s into a meaningful frame object. *

* In a stream-based transport such as TCP/IP, packets can be fragmented and * reassembled during transmission even in a LAN environment. For example, @@ -52,9 +55,9 @@ import java.util.List; * +----+-------+---+---+ * *

- * {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} helps you defrag the received packets into one or more + * {@link ZeroCopyFrameDecoder} helps you defrag the received packets into one or more * meaningful frames that could be easily understood by the - * application logic. In case of the example above, your {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} + * application logic. In case of the example above, your {@link ZeroCopyFrameDecoder} * implementation could defrag the received packets like the following: *

  * +-----+-----+-----+
@@ -77,12 +80,12 @@ import java.util.List;
  * DECODER IMPLEMENTATION
  * ======================
  *
- * public class IntegerHeaderFrameDecoder extends {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} {
+ * public class IntegerHeaderFrameDecoder extends {@link ZeroCopyFrameDecoder} {
  *
  *   {@code @Override}
- *   protected Object decode({@link org.jboss.netty.channel.ChannelHandlerContext} ctx,
- *                           {@link org.jboss.netty.channel.Channel channel},
- *                           {@link org.jboss.netty.buffer.ChannelBuffer} buf) throws Exception {
+ *   protected Object decode({@link ChannelHandlerContext} ctx,
+ *                           {@link Channel} channel,
+ *                           {@link ChannelBuffer} buf) throws Exception {
  *
  *     // Make sure if the length field was received.
  *     if (buf.readableBytes() < 4) {
@@ -117,7 +120,7 @@ import java.util.List;
  *     }
  *
  *     // There's enough bytes in the buffer. Read it.
- *     {@link org.jboss.netty.buffer.ChannelBuffer} frame = buf.readBytes(length);
+ *     {@link ChannelBuffer} frame = buf.readBytes(length);
  *
  *     // Successfully decoded a frame.  Return the decoded frame.
  *     return frame;
@@ -125,34 +128,34 @@ import java.util.List;
  * }
  * 
* - *

Returning a POJO rather than a {@link org.jboss.netty.buffer.ChannelBuffer}

+ *

Returning a POJO rather than a {@link ChannelBuffer}

*

* Please note that you can return an object of a different type than - * {@link org.jboss.netty.buffer.ChannelBuffer} in your {@code decode()} and {@code decodeLast()} + * {@link ChannelBuffer} in your {@code decode()} and {@code decodeLast()} * implementation. For example, you could return a * POJO so that the next - * {@link org.jboss.netty.channel.ChannelUpstreamHandler} receives a {@link org.jboss.netty.channel.MessageEvent} which - * contains a POJO rather than a {@link org.jboss.netty.buffer.ChannelBuffer}. + * {@link ChannelUpstreamHandler} receives a {@link MessageEvent} which + * contains a POJO rather than a {@link ChannelBuffer}. * *

Replacing a decoder with another decoder in a pipeline

*

* If you are going to write a protocol multiplexer, you will probably want to - * replace a {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} (protocol detector) with another - * {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} or {@link org.jboss.netty.handler.codec.replay.ReplayingDecoder} (actual protocol decoder). + * replace a {@link ZeroCopyFrameDecoder} (protocol detector) with another + * {@link ZeroCopyFrameDecoder} or {@link ReplayingDecoder} (actual protocol decoder). * It is not possible to achieve this simply by calling - * {@link org.jboss.netty.channel.ChannelPipeline#replace(org.jboss.netty.channel.ChannelHandler, String, org.jboss.netty.channel.ChannelHandler)}, but + * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but * some additional steps are required: *

- * public class FirstDecoder extends {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} {
+ * public class FirstDecoder extends {@link ZeroCopyFrameDecoder} {
  *
  *     public FirstDecoder() {
  *         super(true); // Enable unfold
  *     }
  *
  *     {@code @Override}
- *     protected Object decode({@link org.jboss.netty.channel.ChannelHandlerContext} ctx,
- *                             {@link org.jboss.netty.channel.Channel} channel,
- *                             {@link org.jboss.netty.buffer.ChannelBuffer} buf) {
+ *     protected Object decode({@link ChannelHandlerContext} ctx,
+ *                             {@link Channel} channel,
+ *                             {@link ChannelBuffer} buf) {
  *         ...
  *         // Decode the first message
  *         Object firstMessage = ...;
@@ -176,7 +179,8 @@ import java.util.List;
  *
  * @apiviz.landmark
  */
-public abstract class ZeroCopyFrameDecoder extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
+public abstract class ZeroCopyFrameDecoder
+        extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
 
     private final boolean unfold;
     protected List cumulation;
@@ -349,8 +353,8 @@ public abstract class ZeroCopyFrameDecoder extends SimpleChannelUpstreamHandler
     }
 
     /**
-     * Gets called on {@link #channelDisconnected(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelStateEvent)} and
-     * {@link #channelClosed(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelStateEvent)}
+     * Gets called on {@link #channelDisconnected(ChannelHandlerContext, ChannelStateEvent)} and
+     * {@link #channelClosed(ChannelHandlerContext, ChannelStateEvent)}
      */
     protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
             throws Exception {
@@ -381,11 +385,11 @@ public abstract class ZeroCopyFrameDecoder extends SimpleChannelUpstreamHandler
     }
 
     /**
-     * Create a new {@link org.jboss.netty.buffer.ChannelBuffer} which is used for the cumulation.
+     * Create a new {@link ChannelBuffer} which is used for the cumulation.
      * Sub-classes may override this.
      *
-     * @param ctx {@link org.jboss.netty.channel.ChannelHandlerContext} for this handler
-     * @return buffer the {@link org.jboss.netty.buffer.ChannelBuffer} which is used for cumulation
+     * @param ctx {@link ChannelHandlerContext} for this handler
+     * @return buffer the {@link ChannelBuffer} which is used for cumulation
      */
     @Deprecated
     protected ChannelBuffer newCumulationBuffer(
@@ -394,8 +398,8 @@ public abstract class ZeroCopyFrameDecoder extends SimpleChannelUpstreamHandler
     }
 
     /**
-     * Replace this {@link org.jboss.netty.handler.codec.frame.ZeroCopyFrameDecoder} in the {@link org.jboss.netty.channel.ChannelPipeline} with the given {@link org.jboss.netty.channel.ChannelHandler}. All
-     * remaining bytes in the {@link org.jboss.netty.buffer.ChannelBuffer} will get send to the new {@link org.jboss.netty.channel.ChannelHandler} that was used
+     * Replace this {@link ZeroCopyFrameDecoder} in the {@link ChannelPipeline} with the given {@link ChannelHandler}.
+     * All remaining bytes in the {@link ChannelBuffer} will get send to the new {@link ChannelHandler} that was used
      * as replacement
      *
      */

From e329cc3524b63fc2670778f37c8d0d3ca2328350 Mon Sep 17 00:00:00 2001
From: Gerd Behrmann 
Date: Tue, 19 Jun 2012 22:41:47 +0200
Subject: [PATCH 3/3] ZeroCopyFrameDecoder: Optimize cumulation buffer
 compaction

Avoid calling CompositeChannelBuffer.decompose. A more efficient
frame deocder specific implementation is used that avoids some
of the cost of decomposing a CompositeChannelBuffer.

Added setMaxUnusedBufferCapacity to set a threshold. If a
cumulation buffer wastes more space than the threshold, the
decoder will resort to copying the buffer to free up the unused
space. The semantics are different from FrameDecoder's
setMaxCumulationBufferCapacity in that the threshold is for
unused space, not buffer capacity. This allows the copy of
large buffers to be avoided if only a small amount of space
is to be gained.

If a copy is invoked, only the actual fragment is copied, not
the complete cummulation buffer. This reduces the cost of
copying the buffer.
---
 .../codec/frame/ZeroCopyFrameDecoder.java     | 98 +++++++++++++++----
 1 file changed, 81 insertions(+), 17 deletions(-)

diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java
index 500f489c01..e74bc0f63c 100644
--- a/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java
+++ b/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java
@@ -185,6 +185,7 @@ public abstract class ZeroCopyFrameDecoder
     private final boolean unfold;
     protected List cumulation;
     private volatile ChannelHandlerContext ctx;
+    private int copyThreshold;
 
     protected ZeroCopyFrameDecoder() {
         this(false);
@@ -194,6 +195,61 @@ public abstract class ZeroCopyFrameDecoder
         this.unfold = unfold;
     }
 
+    /**
+     * Set the maximal unused capacity of the internal cumulation ChannelBuffer
+     * before the {@link ZeroCopyFrameDecoder} tries to minimize the memory usage by
+     * "byte copy".
+     *
+     *
+     * What you use here really depends on your application and need. Using
+     * {@link Integer#MAX_VALUE} will disable all byte copies but give you the
+     * cost of a higher memory usage if big {@link ChannelBuffer}'s will be
+     * received.
+     *
+     * By default a threshold of 0 is used, which means it will
+     * always copy to try to reduce memory usage
+     *
+     *
+     * @param copyThreshold
+     *            the threshold (in bytes) or {@link Integer#MAX_VALUE} to
+     *            disable it. The value must be at least 0
+     * @throws IllegalStateException
+     *             get thrown if someone tries to change this setting after the
+     *             Decoder was added to the {@link ChannelPipeline}
+     */
+    public final void setMaxUnusedBufferCapacity(int copyThreshold) {
+        if (copyThreshold < 0) {
+            throw new IllegalArgumentException("MaxUnusedBufferCapacity must be >= 0");
+        }
+        if (ctx == null) {
+            this.copyThreshold = copyThreshold;
+        } else {
+            throw new IllegalStateException("MaxWastedBufferCapacity " +
+                    "can only be changed before the Decoder was added to the ChannelPipeline");
+        }
+    }
+
+    /**
+     * Returns a compact slice of this buffer's readable bytes.
+     *
+     * The returned buffer may or may not share the content area with the buffer
+     * given as an argument while they maintain separate indexes and marks.
+     * If more than the maximal unused buffer capacity is unused then the
+     * content is copied to a new buffer to conserve memory.
+     *
+     * @param buffer ChannelBuffer to compact
+     * @return a compact slice of buffer
+     */
+    private ChannelBuffer compactBuffer(ChannelBuffer buffer) {
+        if (buffer.capacity() - buffer.readableBytes() > copyThreshold) {
+            ChannelBuffer copy = newCumulationBuffer(ctx, buffer.readableBytes());
+            copy.writeBytes(buffer);
+            return copy;
+        } else {
+            return buffer.slice();
+        }
+    }
+
     @Override
     public void messageReceived(
             ChannelHandlerContext ctx, MessageEvent e) throws Exception {
@@ -218,23 +274,16 @@ public abstract class ZeroCopyFrameDecoder
                 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
             } finally {
                 if (input.readable()) {
-                    // seems like there is something readable left in the input buffer. So create
-                    // the cumulation buffer and copy the input into it
-                    cumulation = new ArrayList(2);
-                    cumulation.add(input);
+                    // unread data is left so create a cumulation buffer
+                    cumulation = new ArrayList();
+                    cumulation.add(compactBuffer(input));
                 }
             }
-
         } else {
-            if (!cumulation.get(0).order().equals(input.order())) {
-                throw new IllegalArgumentException(
-                        "inconsistent byte order");
-            }
-
-            cumulation.add(input);
+            cumulation.add(compactBuffer(input));
 
             CompositeChannelBuffer buf =
-                    new CompositeChannelBuffer(input.order(), cumulation, false);
+                    new CompositeChannelBuffer(cumulation.get(0).order(), cumulation, false);
 
             // Wrap in try / finally.
             //
@@ -245,11 +294,26 @@ public abstract class ZeroCopyFrameDecoder
                 if (!buf.readable()) {
                     // nothing readable left so reset the state
                     cumulation = null;
-                } else {
-                    cumulation = buf.decompose(buf.readerIndex(), buf.readableBytes());
+                } else if (buf.readableBytes() != buf.capacity()) {
+                    // part of the buffer was read, but not all
+                    int read = buf.capacity() - buf.readableBytes();
+
+                    // get rid of fully read leading buffers
+                    int i = 0;
+                    while (read >= cumulation.get(i).readableBytes()) {
+                        read -= cumulation.get(i).readableBytes();
+                        i++;
+                    }
+                    cumulation.subList(0, i).clear();
+
+                    // compact partially read leading buffer
+                    if (read > 0) {
+                        ChannelBuffer first = cumulation.get(0);
+                        first.readerIndex(read);
+                        cumulation.set(0, compactBuffer(first));
+                    }
                 }
             }
-
         }
     }
 
@@ -391,10 +455,10 @@ public abstract class ZeroCopyFrameDecoder
      * @param ctx {@link ChannelHandlerContext} for this handler
      * @return buffer the {@link ChannelBuffer} which is used for cumulation
      */
-    @Deprecated
     protected ChannelBuffer newCumulationBuffer(
             ChannelHandlerContext ctx, int minimumCapacity) {
-        return null;
+        ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
+        return factory.getBuffer(minimumCapacity);
     }
 
     /**