From 1311a2edc1fd66b4dfc8a31a1894d69c77ce9aca Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sun, 24 Jun 2012 22:12:08 +0900 Subject: [PATCH] Simplify FrameDecoder and ReplayingDecoder --- .../netty/buffer/CompositeChannelBuffer.java | 4 + .../handler/codec/frame/FrameDecoder.java | 107 ++-- .../codec/frame/ZeroCopyFrameDecoder.java | 532 ------------------ .../codec/replay/ReplayingDecoder.java | 98 +--- .../codec/replay/ReplayingDecoderBuffer.java | 153 +++-- 5 files changed, 150 insertions(+), 744 deletions(-) delete mode 100644 src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java diff --git a/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java index 3c84532832..b9bad60bf4 100644 --- a/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java @@ -174,6 +174,10 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer { return indices[components.length]; } + public int numComponents() { + return components.length; + } + public byte getByte(int index) { int componentId = componentId(index); return components[componentId].getByte(index - indices[componentId]); diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java index 804a1607b1..956174a5f8 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java @@ -250,79 +250,66 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implemen } if (cumulation == null) { - // Wrap in try / finally. - // - // See https://github.com/netty/netty/issues/364 try { // the cumulation buffer is not created yet so just pass the input to callDecode(...) method callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); } finally { - int readable = input.readableBytes(); - - if (readable > 0) { - int cap = input.capacity(); - - // check if readableBytes == capacity we can safe the copy as we will not be able to - // optimize memory usage anyway - if (readable != cap && cap > copyThreshold) { - // seems like there is something readable left in the input buffer. So create - // the cumulation buffer and copy the input into it - cumulation = newCumulationBuffer(ctx, input.readableBytes()); - cumulation.writeBytes(input); - } else { - // just use the input as cumulation buffer for now - cumulation = input; - } - - } + updateCumulation(ctx, input); } } else { - assert cumulation.readable(); - - // wrap the cumulation and input - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(cumulation, input); - cumulation = buf; - - // Wrap in try / finally. - // - // See https://github.com/netty/netty/issues/364 + input = appendToCumulation(input); try { - callDecode(ctx, e.getChannel(), buf, e.getRemoteAddress()); + callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); } finally { - int readable = buf.readableBytes(); - if (readable == 0) { - // nothing readable left so reset the state - cumulation = null; + updateCumulation(ctx, input); + } + } + } + + protected ChannelBuffer appendToCumulation(ChannelBuffer input) { + ChannelBuffer cumulation = this.cumulation; + assert cumulation.readable(); + if (cumulation instanceof CompositeChannelBuffer) { + // Make sure the resulting cumulation buffer has no more than 4 components. + CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation; + if (composite.numComponents() >= 4) { + cumulation = composite.copy(); + } + } + + this.cumulation = input = ChannelBuffers.wrappedBuffer(cumulation, input); + return input; + } + + protected ChannelBuffer updateCumulation(ChannelHandlerContext ctx, ChannelBuffer input) { + ChannelBuffer newCumulation; + int readableBytes = input.readableBytes(); + if (readableBytes > 0) { + int inputCapacity = input.capacity(); + + // If input.readableBytes() == input.capacity() (i.e. input is full), + // there's nothing to save from creating a new cumulation buffer + // even if input.capacity() exceeds the threshold, because the new cumulation + // buffer will have the same capacity and content with input. + if (readableBytes < inputCapacity && inputCapacity > copyThreshold) { + // At least one byte was consumed by callDecode() and input.capacity() + // exceeded the threshold. + cumulation = newCumulation = newCumulationBuffer(ctx, input.readableBytes()); + cumulation.writeBytes(input); + } else { + // Nothing was consumed by callDecode() or input.capacity() did not + // exceed the threshold. + if (input.readerIndex() != 0) { + cumulation = newCumulation = input.slice(); } else { - int cap = buf.capacity(); - - if (readable != cap && cap > copyThreshold) { - // the readable bytes are > as the configured - // copyThreshold, so create a new buffer and copy the - // bytes into it - cumulation = newCumulationBuffer(ctx, buf.readableBytes()); - cumulation.writeBytes(buf); - - } else { - if (readable == cap) { - cumulation = buf; - } else { - // create a new cumulation buffer that holds the - // unwrapped parts of the CompositeChannelBuffer - // that are not read yet. - cumulation = ChannelBuffers.wrappedBuffer(((CompositeChannelBuffer) buf) - .decompose(buf.readerIndex(), buf.readableBytes()) - .toArray(new ChannelBuffer[0])); - - } - - } - + cumulation = newCumulation = input; } } - + } else { + cumulation = newCumulation = null; } + return newCumulation; } @Override diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java deleted file mode 100644 index e74bc0f63c..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/frame/ZeroCopyFrameDecoder.java +++ /dev/null @@ -1,532 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.frame; - -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.List; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferFactory; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.buffer.CompositeChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandler; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ChannelUpstreamHandler; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.LifeCycleAwareChannelHandler; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.handler.codec.replay.ReplayingDecoder; - -/** - * Decodes the received {@link ChannelBuffer}s into a meaningful frame object. - *

- * In a stream-based transport such as TCP/IP, packets can be fragmented and - * reassembled during transmission even in a LAN environment. For example, - * let us assume you have received three packets: - *

- * +-----+-----+-----+
- * | ABC | DEF | GHI |
- * +-----+-----+-----+
- * 
- * because of the packet fragmentation, a server can receive them like the - * following: - *
- * +----+-------+---+---+
- * | AB | CDEFG | H | I |
- * +----+-------+---+---+
- * 
- *

- * {@link ZeroCopyFrameDecoder} helps you defrag the received packets into one or more - * meaningful frames that could be easily understood by the - * application logic. In case of the example above, your {@link ZeroCopyFrameDecoder} - * implementation could defrag the received packets like the following: - *

- * +-----+-----+-----+
- * | ABC | DEF | GHI |
- * +-----+-----+-----+
- * 
- *

- * The following code shows an example handler which decodes a frame whose - * first 4 bytes header represents the length of the frame, excluding the - * header. - *

- * MESSAGE FORMAT
- * ==============
- *
- * Offset:  0        4                   (Length + 4)
- *          +--------+------------------------+
- * Fields:  | Length | Actual message content |
- *          +--------+------------------------+
- *
- * DECODER IMPLEMENTATION
- * ======================
- *
- * public class IntegerHeaderFrameDecoder extends {@link ZeroCopyFrameDecoder} {
- *
- *   {@code @Override}
- *   protected Object decode({@link ChannelHandlerContext} ctx,
- *                           {@link Channel} channel,
- *                           {@link ChannelBuffer} buf) throws Exception {
- *
- *     // Make sure if the length field was received.
- *     if (buf.readableBytes() < 4) {
- *        // The length field was not received yet - return null.
- *        // This method will be invoked again when more packets are
- *        // received and appended to the buffer.
- *        return null;
- *     }
- *
- *     // The length field is in the buffer.
- *
- *     // Mark the current buffer position before reading the length field
- *     // because the whole frame might not be in the buffer yet.
- *     // We will reset the buffer position to the marked position if
- *     // there's not enough bytes in the buffer.
- *     buf.markReaderIndex();
- *
- *     // Read the length field.
- *     int length = buf.readInt();
- *
- *     // Make sure if there's enough bytes in the buffer.
- *     if (buf.readableBytes() < length) {
- *        // The whole bytes were not received yet - return null.
- *        // This method will be invoked again when more packets are
- *        // received and appended to the buffer.
- *
- *        // Reset to the marked position to read the length field again
- *        // next time.
- *        buf.resetReaderIndex();
- *
- *        return null;
- *     }
- *
- *     // There's enough bytes in the buffer. Read it.
- *     {@link ChannelBuffer} frame = buf.readBytes(length);
- *
- *     // Successfully decoded a frame.  Return the decoded frame.
- *     return frame;
- *   }
- * }
- * 
- * - *

Returning a POJO rather than a {@link ChannelBuffer}

- *

- * Please note that you can return an object of a different type than - * {@link ChannelBuffer} in your {@code decode()} and {@code decodeLast()} - * implementation. For example, you could return a - * POJO so that the next - * {@link ChannelUpstreamHandler} receives a {@link MessageEvent} which - * contains a POJO rather than a {@link ChannelBuffer}. - * - *

Replacing a decoder with another decoder in a pipeline

- *

- * If you are going to write a protocol multiplexer, you will probably want to - * replace a {@link ZeroCopyFrameDecoder} (protocol detector) with another - * {@link ZeroCopyFrameDecoder} or {@link ReplayingDecoder} (actual protocol decoder). - * It is not possible to achieve this simply by calling - * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but - * some additional steps are required: - *

- * public class FirstDecoder extends {@link ZeroCopyFrameDecoder} {
- *
- *     public FirstDecoder() {
- *         super(true); // Enable unfold
- *     }
- *
- *     {@code @Override}
- *     protected Object decode({@link ChannelHandlerContext} ctx,
- *                             {@link Channel} channel,
- *                             {@link ChannelBuffer} buf) {
- *         ...
- *         // Decode the first message
- *         Object firstMessage = ...;
- *
- *         // Add the second decoder
- *         ctx.getPipeline().addLast("second", new SecondDecoder());
- *
- *         // Remove the first decoder (me)
- *         ctx.getPipeline().remove(this);
- *
- *         if (buf.readable()) {
- *             // Hand off the remaining data to the second decoder
- *             return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) };
- *         } else {
- *             // Nothing to hand off
- *             return firstMessage;
- *         }
- *     }
- * }
- * 
- * - * @apiviz.landmark - */ -public abstract class ZeroCopyFrameDecoder - extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler { - - private final boolean unfold; - protected List cumulation; - private volatile ChannelHandlerContext ctx; - private int copyThreshold; - - protected ZeroCopyFrameDecoder() { - this(false); - } - - protected ZeroCopyFrameDecoder(boolean unfold) { - this.unfold = unfold; - } - - /** - * Set the maximal unused capacity of the internal cumulation ChannelBuffer - * before the {@link ZeroCopyFrameDecoder} tries to minimize the memory usage by - * "byte copy". - * - * - * What you use here really depends on your application and need. Using - * {@link Integer#MAX_VALUE} will disable all byte copies but give you the - * cost of a higher memory usage if big {@link ChannelBuffer}'s will be - * received. - * - * By default a threshold of 0 is used, which means it will - * always copy to try to reduce memory usage - * - * - * @param copyThreshold - * the threshold (in bytes) or {@link Integer#MAX_VALUE} to - * disable it. The value must be at least 0 - * @throws IllegalStateException - * get thrown if someone tries to change this setting after the - * Decoder was added to the {@link ChannelPipeline} - */ - public final void setMaxUnusedBufferCapacity(int copyThreshold) { - if (copyThreshold < 0) { - throw new IllegalArgumentException("MaxUnusedBufferCapacity must be >= 0"); - } - if (ctx == null) { - this.copyThreshold = copyThreshold; - } else { - throw new IllegalStateException("MaxWastedBufferCapacity " + - "can only be changed before the Decoder was added to the ChannelPipeline"); - } - } - - /** - * Returns a compact slice of this buffer's readable bytes. - * - * The returned buffer may or may not share the content area with the buffer - * given as an argument while they maintain separate indexes and marks. - * If more than the maximal unused buffer capacity is unused then the - * content is copied to a new buffer to conserve memory. - * - * @param buffer ChannelBuffer to compact - * @return a compact slice of buffer - */ - private ChannelBuffer compactBuffer(ChannelBuffer buffer) { - if (buffer.capacity() - buffer.readableBytes() > copyThreshold) { - ChannelBuffer copy = newCumulationBuffer(ctx, buffer.readableBytes()); - copy.writeBytes(buffer); - return copy; - } else { - return buffer.slice(); - } - } - - @Override - public void messageReceived( - ChannelHandlerContext ctx, MessageEvent e) throws Exception { - - Object m = e.getMessage(); - if (!(m instanceof ChannelBuffer)) { - ctx.sendUpstream(e); - return; - } - - ChannelBuffer input = (ChannelBuffer) m; - if (!input.readable()) { - return; - } - - if (cumulation == null) { - // Wrap in try / finally. - // - // See https://github.com/netty/netty/issues/364 - try { - // the cumulation buffer is not created yet so just pass the input to callDecode(...) method - callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); - } finally { - if (input.readable()) { - // unread data is left so create a cumulation buffer - cumulation = new ArrayList(); - cumulation.add(compactBuffer(input)); - } - } - } else { - cumulation.add(compactBuffer(input)); - - CompositeChannelBuffer buf = - new CompositeChannelBuffer(cumulation.get(0).order(), cumulation, false); - - // Wrap in try / finally. - // - // See https://github.com/netty/netty/issues/364 - try { - callDecode(ctx, e.getChannel(), buf, e.getRemoteAddress()); - } finally { - if (!buf.readable()) { - // nothing readable left so reset the state - cumulation = null; - } else if (buf.readableBytes() != buf.capacity()) { - // part of the buffer was read, but not all - int read = buf.capacity() - buf.readableBytes(); - - // get rid of fully read leading buffers - int i = 0; - while (read >= cumulation.get(i).readableBytes()) { - read -= cumulation.get(i).readableBytes(); - i++; - } - cumulation.subList(0, i).clear(); - - // compact partially read leading buffer - if (read > 0) { - ChannelBuffer first = cumulation.get(0); - first.readerIndex(read); - cumulation.set(0, compactBuffer(first)); - } - } - } - } - } - - @Override - public void channelDisconnected( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - cleanup(ctx, e); - } - - @Override - public void channelClosed( - ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - cleanup(ctx, e); - } - - @Override - public void exceptionCaught( - ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - ctx.sendUpstream(e); - } - - /** - * Decodes the received packets so far into a frame. - * - * @param ctx the context of this handler - * @param channel the current channel - * @param buffer the cumulative buffer of received packets so far. - * Note that the buffer might be empty, which means you - * should not make an assumption that the buffer contains - * at least one byte in your decoder implementation. - * - * @return the decoded frame if a full frame was received and decoded. - * {@code null} if there's not enough data in the buffer to decode a frame. - */ - protected abstract Object decode( - ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception; - - /** - * Decodes the received data so far into a frame when the channel is - * disconnected. - * - * @param ctx the context of this handler - * @param channel the current channel - * @param buffer the cumulative buffer of received packets so far. - * Note that the buffer might be empty, which means you - * should not make an assumption that the buffer contains - * at least one byte in your decoder implementation. - * - * @return the decoded frame if a full frame was received and decoded. - * {@code null} if there's not enough data in the buffer to decode a frame. - */ - protected Object decodeLast( - ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { - return decode(ctx, channel, buffer); - } - - private void callDecode( - ChannelHandlerContext context, Channel channel, - ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception { - - while (cumulation.readable()) { - int oldReaderIndex = cumulation.readerIndex(); - Object frame = decode(context, channel, cumulation); - if (frame == null) { - if (oldReaderIndex == cumulation.readerIndex()) { - // Seems like more data is required. - // Let us wait for the next notification. - break; - } else { - // Previous data has been discarded. - // Probably it is reading on. - continue; - } - } else if (oldReaderIndex == cumulation.readerIndex()) { - throw new IllegalStateException( - "decode() method must read at least one byte " + - "if it returned a frame (caused by: " + getClass() + ")"); - } - - unfoldAndFireMessageReceived(context, remoteAddress, frame); - } - } - - protected final void unfoldAndFireMessageReceived( - ChannelHandlerContext context, SocketAddress remoteAddress, Object result) { - if (unfold) { - if (result instanceof Object[]) { - for (Object r: (Object[]) result) { - Channels.fireMessageReceived(context, r, remoteAddress); - } - } else if (result instanceof Iterable) { - for (Object r: (Iterable) result) { - Channels.fireMessageReceived(context, r, remoteAddress); - } - } else { - Channels.fireMessageReceived(context, result, remoteAddress); - } - } else { - Channels.fireMessageReceived(context, result, remoteAddress); - } - } - - /** - * Gets called on {@link #channelDisconnected(ChannelHandlerContext, ChannelStateEvent)} and - * {@link #channelClosed(ChannelHandlerContext, ChannelStateEvent)} - */ - protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - try { - List cumulation = this.cumulation; - if (cumulation == null) { - return; - } - - this.cumulation = null; - - CompositeChannelBuffer buf = - new CompositeChannelBuffer(cumulation.get(0).order(), cumulation, false); - - // Make sure all frames are read before notifying a closed channel. - callDecode(ctx, ctx.getChannel(), buf, null); - - // Call decodeLast() finally. Please note that decodeLast() is - // called even if there's nothing more to read from the buffer to - // notify a user that the connection was closed explicitly. - Object partialFrame = decodeLast(ctx, ctx.getChannel(), buf); - if (partialFrame != null) { - unfoldAndFireMessageReceived(ctx, null, partialFrame); - } - } finally { - ctx.sendUpstream(e); - } - } - - /** - * Create a new {@link ChannelBuffer} which is used for the cumulation. - * Sub-classes may override this. - * - * @param ctx {@link ChannelHandlerContext} for this handler - * @return buffer the {@link ChannelBuffer} which is used for cumulation - */ - protected ChannelBuffer newCumulationBuffer( - ChannelHandlerContext ctx, int minimumCapacity) { - ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); - return factory.getBuffer(minimumCapacity); - } - - /** - * Replace this {@link ZeroCopyFrameDecoder} in the {@link ChannelPipeline} with the given {@link ChannelHandler}. - * All remaining bytes in the {@link ChannelBuffer} will get send to the new {@link ChannelHandler} that was used - * as replacement - * - */ - public void replace(String handlerName, ChannelHandler handler) { - if (ctx == null) { - throw new IllegalStateException( - "Replace cann only be called once the FrameDecoder is added to the ChannelPipeline"); - } - ChannelPipeline pipeline = ctx.getPipeline(); - pipeline.addAfter(ctx.getName(), handlerName, handler); - - try { - if (cumulation != null) { - CompositeChannelBuffer buf = - new CompositeChannelBuffer(cumulation.get(0).order(), cumulation, false); - Channels.fireMessageReceived(ctx, buf); - cumulation = null; - } - } finally { - pipeline.remove(this); - } - - } - - /** - * Returns the actual number of readable bytes in the internal cumulative - * buffer of this decoder. You usually do not need to rely on this value - * to write a decoder. Use it only when you muse use it at your own risk. - * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. - */ - protected int actualReadableBytes() { - return internalBuffer().readableBytes(); - } - - - - /** - * Returns the internal cumulative buffer of this decoder. You usually - * do not need to access the internal buffer directly to write a decoder. - * Use it only when you must use it at your own risk. - */ - protected ChannelBuffer internalBuffer() { - List buf = cumulation; - if (buf == null) { - return ChannelBuffers.EMPTY_BUFFER; - } - return new CompositeChannelBuffer(cumulation.get(0).order(), cumulation, false); - } - - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { - this.ctx = ctx; - } - - public void afterAdd(ChannelHandlerContext ctx) throws Exception { - // Nothing to do.. - } - - public void beforeRemove(ChannelHandlerContext ctx) throws Exception { - // Nothing to do.. - } - - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - // Nothing to do.. - } - -} diff --git a/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoder.java b/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoder.java index 27849a72f1..8b7c19328e 100644 --- a/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoder.java @@ -18,8 +18,6 @@ package org.jboss.netty.handler.codec.replay; import java.net.SocketAddress; import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.buffer.CompositeChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelHandlerContext; @@ -286,7 +284,7 @@ public abstract class ReplayingDecoder> extends FrameDecoder { - private ReplayingDecoderBuffer replayable; + private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(this); private T state; private int checkpoint; private boolean needsCleanup; @@ -315,6 +313,11 @@ public abstract class ReplayingDecoder> this.state = initialState; } + @Override + protected ChannelBuffer internalBuffer() { + return super.internalBuffer(); + } + /** * Stores the internal cumulative buffer's reader position. */ @@ -426,30 +429,24 @@ public abstract class ReplayingDecoder> // the cumulation buffer is not created yet so just pass the input // to callDecode(...) method cumulation = input; - replayable = new ReplayingDecoderBuffer(input); int oldReaderIndex = input.readerIndex(); int inputSize = input.readableBytes(); - // Wrap in try / finally. - // - // See https://github.com/netty/netty/issues/364 try { callDecode( ctx, e.getChannel(), input, replayable, e.getRemoteAddress()); } finally { - int readable = input.readableBytes(); - - if (readable > 0) { - int cap = input.capacity(); - boolean copy = false; + int readableBytes = input.readableBytes(); + if (readableBytes > 0) { + int inputCapacity = input.capacity(); // check if readableBytes == capacity we can safe the copy as we will not be able to // optimize memory usage anyway - if (readable != cap && cap > getMaxCumulationBufferCapacity()) { - copy = true; - } + boolean copy = + readableBytes != inputCapacity && + inputCapacity > getMaxCumulationBufferCapacity(); // seems like there is something readable left in the input buffer // or decoder wants a replay - create the cumulation buffer and @@ -458,86 +455,39 @@ public abstract class ReplayingDecoder> if (checkpoint > 0) { int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex); if (copy) { - cumulation = this.cumulation = - newCumulationBuffer(ctx, bytesToPreserve); + this.cumulation = cumulation = newCumulationBuffer(ctx, bytesToPreserve); cumulation.writeBytes(input, checkpoint, bytesToPreserve); } else { - cumulation = this.cumulation = - input.slice(checkpoint, bytesToPreserve); + this.cumulation = cumulation = input.slice(checkpoint, bytesToPreserve); } } else if (checkpoint == 0) { if (copy) { - cumulation = this.cumulation = - newCumulationBuffer(ctx, inputSize); + this.cumulation = cumulation = newCumulationBuffer(ctx, inputSize); cumulation.writeBytes(input, oldReaderIndex, inputSize); cumulation.readerIndex(input.readerIndex()); } else { - cumulation = this.cumulation = - input.slice(oldReaderIndex, inputSize); + this.cumulation = cumulation = input.slice(oldReaderIndex, inputSize); cumulation.readerIndex(input.readerIndex()); } } else { if (copy) { - cumulation = this.cumulation = - newCumulationBuffer(ctx, input.readableBytes()); + this.cumulation = cumulation = newCumulationBuffer(ctx, input.readableBytes()); cumulation.writeBytes(input); } else { - cumulation = this.cumulation = - input; + this.cumulation = cumulation = input; } } - replayable = new ReplayingDecoderBuffer(cumulation); } else { cumulation = null; - replayable = ReplayingDecoderBuffer.EMPTY_BUFFER; } } - } else { - assert cumulation.readable(); - // wrap the cumulation and input - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(cumulation, input); - cumulation = buf; - replayable = new ReplayingDecoderBuffer(cumulation); - - // Wrap in try / finally. - // - // See https://github.com/netty/netty/issues/364 + input = appendToCumulation(input); try { - callDecode(ctx, e.getChannel(), buf, replayable, e.getRemoteAddress()); + callDecode(ctx, e.getChannel(), input, replayable, e.getRemoteAddress()); } finally { - int readable = buf.readableBytes(); - if (readable == 0) { - // nothing readable left so reset the state - cumulation = null; - replayable = ReplayingDecoderBuffer.EMPTY_BUFFER; - } else { - int cap = buf.capacity(); - - if (readable != cap && cap > getMaxCumulationBufferCapacity()) { - // the readable bytes are > as the configured - // copyThreshold, so create a new buffer and copy the - // bytes into it - cumulation = newCumulationBuffer(ctx, buf.readableBytes()); - cumulation.writeBytes(buf); - - } else { - if (readable == cap) { - cumulation = buf; - } else { - // create a new cumulation buffer that holds the - // unwrapped parts of the CompositeChannelBuffer - // that are not read yet. - cumulation = ChannelBuffers.wrappedBuffer(((CompositeChannelBuffer) buf) - .decompose(buf.readerIndex(), buf.readableBytes()) - .toArray(new ChannelBuffer[0])); - } - - } - replayable = new ReplayingDecoderBuffer(cumulation); - } + updateCumulation(ctx, input); } - } } @@ -600,7 +550,6 @@ public abstract class ReplayingDecoder> needsCleanup = false; } - this.cumulation = null; replayable.terminate(); if (cumulation != null && cumulation.readable()) { @@ -612,15 +561,16 @@ public abstract class ReplayingDecoder> // called even if there's nothing more to read from the buffer to // notify a user that the connection was closed explicitly. Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state); + + this.cumulation = null; + if (partiallyDecoded != null) { unfoldAndFireMessageReceived(ctx, null, partiallyDecoded); } } catch (ReplayError replay) { // Ignore } finally { - replayable = ReplayingDecoderBuffer.EMPTY_BUFFER; ctx.sendUpstream(e); } } - } diff --git a/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoderBuffer.java b/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoderBuffer.java index 94764279a3..d884a58637 100644 --- a/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoderBuffer.java +++ b/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoderBuffer.java @@ -27,23 +27,20 @@ import java.nio.charset.Charset; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferFactory; import org.jboss.netty.buffer.ChannelBufferIndexFinder; -import org.jboss.netty.buffer.ChannelBuffers; class ReplayingDecoderBuffer implements ChannelBuffer { private static final Error REPLAY = new ReplayError(); - private final ChannelBuffer buffer; + private final ReplayingDecoder parent; private boolean terminated; - public static ReplayingDecoderBuffer EMPTY_BUFFER = new ReplayingDecoderBuffer(ChannelBuffers.EMPTY_BUFFER); - - static { - EMPTY_BUFFER.terminate(); + ReplayingDecoderBuffer(ReplayingDecoder parent) { + this.parent = parent; } - ReplayingDecoderBuffer(ChannelBuffer buffer) { - this.buffer = buffer; + private ChannelBuffer buf() { + return parent.internalBuffer(); } void terminate() { @@ -52,14 +49,14 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public int capacity() { if (terminated) { - return buffer.capacity(); + return buf().capacity(); } else { return Integer.MAX_VALUE; } } public boolean isDirect() { - return buffer.isDirect(); + return buf().isDirect(); } public boolean hasArray() { @@ -93,7 +90,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public ChannelBuffer copy(int index, int length) { checkIndex(index, length); - return buffer.copy(index, length); + return buf().copy(index, length); } public void discardReadBytes() { @@ -110,22 +107,22 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public byte getByte(int index) { checkIndex(index); - return buffer.getByte(index); + return buf().getByte(index); } public short getUnsignedByte(int index) { checkIndex(index); - return buffer.getUnsignedByte(index); + return buf().getUnsignedByte(index); } public void getBytes(int index, byte[] dst, int dstIndex, int length) { checkIndex(index, length); - buffer.getBytes(index, dst, dstIndex, length); + buf().getBytes(index, dst, dstIndex, length); } public void getBytes(int index, byte[] dst) { checkIndex(index, dst.length); - buffer.getBytes(index, dst); + buf().getBytes(index, dst); } public void getBytes(int index, ByteBuffer dst) { @@ -134,7 +131,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public void getBytes(int index, ChannelBuffer dst, int dstIndex, int length) { checkIndex(index, length); - buffer.getBytes(index, dst, dstIndex, length); + buf().getBytes(index, dst, dstIndex, length); } public void getBytes(int index, ChannelBuffer dst, int length) { @@ -157,52 +154,52 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public int getInt(int index) { checkIndex(index, 4); - return buffer.getInt(index); + return buf().getInt(index); } public long getUnsignedInt(int index) { checkIndex(index, 4); - return buffer.getUnsignedInt(index); + return buf().getUnsignedInt(index); } public long getLong(int index) { checkIndex(index, 8); - return buffer.getLong(index); + return buf().getLong(index); } public int getMedium(int index) { checkIndex(index, 3); - return buffer.getMedium(index); + return buf().getMedium(index); } public int getUnsignedMedium(int index) { checkIndex(index, 3); - return buffer.getUnsignedMedium(index); + return buf().getUnsignedMedium(index); } public short getShort(int index) { checkIndex(index, 2); - return buffer.getShort(index); + return buf().getShort(index); } public int getUnsignedShort(int index) { checkIndex(index, 2); - return buffer.getUnsignedShort(index); + return buf().getUnsignedShort(index); } public char getChar(int index) { checkIndex(index, 2); - return buffer.getChar(index); + return buf().getChar(index); } public float getFloat(int index) { checkIndex(index, 4); - return buffer.getFloat(index); + return buf().getFloat(index); } public double getDouble(int index) { checkIndex(index, 8); - return buffer.getDouble(index); + return buf().getDouble(index); } @Override @@ -211,7 +208,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { } public int indexOf(int fromIndex, int toIndex, byte value) { - int endIndex = buffer.indexOf(fromIndex, toIndex, value); + int endIndex = buf().indexOf(fromIndex, toIndex, value); if (endIndex < 0) { throw REPLAY; } @@ -220,7 +217,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public int indexOf(int fromIndex, int toIndex, ChannelBufferIndexFinder indexFinder) { - int endIndex = buffer.indexOf(fromIndex, toIndex, indexFinder); + int endIndex = buf().indexOf(fromIndex, toIndex, indexFinder); if (endIndex < 0) { throw REPLAY; } @@ -228,7 +225,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { } public int bytesBefore(byte value) { - int bytes = buffer.bytesBefore(value); + int bytes = buf().bytesBefore(value); if (bytes < 0) { throw REPLAY; } @@ -236,7 +233,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { } public int bytesBefore(ChannelBufferIndexFinder indexFinder) { - int bytes = buffer.bytesBefore(indexFinder); + int bytes = buf().bytesBefore(indexFinder); if (bytes < 0) { throw REPLAY; } @@ -245,7 +242,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public int bytesBefore(int length, byte value) { checkReadableBytes(length); - int bytes = buffer.bytesBefore(length, value); + int bytes = buf().bytesBefore(length, value); if (bytes < 0) { throw REPLAY; } @@ -254,7 +251,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public int bytesBefore(int length, ChannelBufferIndexFinder indexFinder) { checkReadableBytes(length); - int bytes = buffer.bytesBefore(length, indexFinder); + int bytes = buf().bytesBefore(length, indexFinder); if (bytes < 0) { throw REPLAY; } @@ -262,7 +259,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { } public int bytesBefore(int index, int length, byte value) { - int bytes = buffer.bytesBefore(index, length, value); + int bytes = buf().bytesBefore(index, length, value); if (bytes < 0) { throw REPLAY; } @@ -271,7 +268,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public int bytesBefore(int index, int length, ChannelBufferIndexFinder indexFinder) { - int bytes = buffer.bytesBefore(index, length, indexFinder); + int bytes = buf().bytesBefore(index, length, indexFinder); if (bytes < 0) { throw REPLAY; } @@ -279,7 +276,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { } public void markReaderIndex() { - buffer.markReaderIndex(); + buf().markReaderIndex(); } public void markWriterIndex() { @@ -287,43 +284,43 @@ class ReplayingDecoderBuffer implements ChannelBuffer { } public ChannelBufferFactory factory() { - return buffer.factory(); + return buf().factory(); } public ByteOrder order() { - return buffer.order(); + return buf().order(); } public boolean readable() { - return terminated? buffer.readable() : true; + return terminated? buf().readable() : true; } public int readableBytes() { if (terminated) { - return buffer.readableBytes(); + return buf().readableBytes(); } else { - return Integer.MAX_VALUE - buffer.readerIndex(); + return Integer.MAX_VALUE - buf().readerIndex(); } } public byte readByte() { checkReadableBytes(1); - return buffer.readByte(); + return buf().readByte(); } public short readUnsignedByte() { checkReadableBytes(1); - return buffer.readUnsignedByte(); + return buf().readUnsignedByte(); } public void readBytes(byte[] dst, int dstIndex, int length) { checkReadableBytes(length); - buffer.readBytes(dst, dstIndex, length); + buf().readBytes(dst, dstIndex, length); } public void readBytes(byte[] dst) { checkReadableBytes(dst.length); - buffer.readBytes(dst); + buf().readBytes(dst); } public void readBytes(ByteBuffer dst) { @@ -332,7 +329,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public void readBytes(ChannelBuffer dst, int dstIndex, int length) { checkReadableBytes(length); - buffer.readBytes(dst, dstIndex, length); + buf().readBytes(dst, dstIndex, length); } public void readBytes(ChannelBuffer dst, int length) { @@ -345,11 +342,11 @@ class ReplayingDecoderBuffer implements ChannelBuffer { @Deprecated public ChannelBuffer readBytes(ChannelBufferIndexFinder endIndexFinder) { - int endIndex = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), endIndexFinder); + int endIndex = buf().indexOf(buf().readerIndex(), buf().writerIndex(), endIndexFinder); if (endIndex < 0) { throw REPLAY; } - return buffer.readBytes(endIndex - buffer.readerIndex()); + return buf().readBytes(endIndex - buf().readerIndex()); } public int readBytes(GatheringByteChannel out, int length) @@ -359,22 +356,22 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public ChannelBuffer readBytes(int length) { checkReadableBytes(length); - return buffer.readBytes(length); + return buf().readBytes(length); } @Deprecated public ChannelBuffer readSlice( ChannelBufferIndexFinder endIndexFinder) { - int endIndex = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), endIndexFinder); + int endIndex = buf().indexOf(buf().readerIndex(), buf().writerIndex(), endIndexFinder); if (endIndex < 0) { throw REPLAY; } - return buffer.readSlice(endIndex - buffer.readerIndex()); + return buf().readSlice(endIndex - buf().readerIndex()); } public ChannelBuffer readSlice(int length) { checkReadableBytes(length); - return buffer.readSlice(length); + return buf().readSlice(length); } public void readBytes(OutputStream out, int length) throws IOException { @@ -382,65 +379,65 @@ class ReplayingDecoderBuffer implements ChannelBuffer { } public int readerIndex() { - return buffer.readerIndex(); + return buf().readerIndex(); } public void readerIndex(int readerIndex) { - buffer.readerIndex(readerIndex); + buf().readerIndex(readerIndex); } public int readInt() { checkReadableBytes(4); - return buffer.readInt(); + return buf().readInt(); } public long readUnsignedInt() { checkReadableBytes(4); - return buffer.readUnsignedInt(); + return buf().readUnsignedInt(); } public long readLong() { checkReadableBytes(8); - return buffer.readLong(); + return buf().readLong(); } public int readMedium() { checkReadableBytes(3); - return buffer.readMedium(); + return buf().readMedium(); } public int readUnsignedMedium() { checkReadableBytes(3); - return buffer.readUnsignedMedium(); + return buf().readUnsignedMedium(); } public short readShort() { checkReadableBytes(2); - return buffer.readShort(); + return buf().readShort(); } public int readUnsignedShort() { checkReadableBytes(2); - return buffer.readUnsignedShort(); + return buf().readUnsignedShort(); } public char readChar() { checkReadableBytes(2); - return buffer.readChar(); + return buf().readChar(); } public float readFloat() { checkReadableBytes(4); - return buffer.readFloat(); + return buf().readFloat(); } public double readDouble() { checkReadableBytes(8); - return buffer.readDouble(); + return buf().readDouble(); } public void resetReaderIndex() { - buffer.resetReaderIndex(); + buf().resetReaderIndex(); } public void resetWriterIndex() { @@ -523,18 +520,18 @@ class ReplayingDecoderBuffer implements ChannelBuffer { @Deprecated public int skipBytes(ChannelBufferIndexFinder firstIndexFinder) { - int oldReaderIndex = buffer.readerIndex(); - int newReaderIndex = buffer.indexOf(oldReaderIndex, buffer.writerIndex(), firstIndexFinder); + int oldReaderIndex = buf().readerIndex(); + int newReaderIndex = buf().indexOf(oldReaderIndex, buf().writerIndex(), firstIndexFinder); if (newReaderIndex < 0) { throw REPLAY; } - buffer.readerIndex(newReaderIndex); + buf().readerIndex(newReaderIndex); return newReaderIndex - oldReaderIndex; } public void skipBytes(int length) { checkReadableBytes(length); - buffer.skipBytes(length); + buf().skipBytes(length); } public ChannelBuffer slice() { @@ -543,7 +540,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public ChannelBuffer slice(int index, int length) { checkIndex(index, length); - return buffer.slice(index, length); + return buf().slice(index, length); } public ByteBuffer toByteBuffer() { @@ -552,7 +549,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public ByteBuffer toByteBuffer(int index, int length) { checkIndex(index, length); - return buffer.toByteBuffer(index, length); + return buf().toByteBuffer(index, length); } public ByteBuffer[] toByteBuffers() { @@ -561,12 +558,12 @@ class ReplayingDecoderBuffer implements ChannelBuffer { public ByteBuffer[] toByteBuffers(int index, int length) { checkIndex(index, length); - return buffer.toByteBuffers(index, length); + return buf().toByteBuffers(index, length); } public String toString(int index, int length, Charset charset) { checkIndex(index, length); - return buffer.toString(index, length, charset); + return buf().toString(index, length, charset); } public String toString(Charset charsetName) { @@ -576,7 +573,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { @Deprecated public String toString(int index, int length, String charsetName) { checkIndex(index, length); - return buffer.toString(index, length, charsetName); + return buf().toString(index, length, charsetName); } @Deprecated @@ -584,7 +581,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { int index, int length, String charsetName, ChannelBufferIndexFinder terminatorFinder) { checkIndex(index, length); - return buffer.toString(index, length, charsetName, terminatorFinder); + return buf().toString(index, length, charsetName, terminatorFinder); } @Deprecated @@ -671,7 +668,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer { } public int writerIndex() { - return buffer.writerIndex(); + return buf().writerIndex(); } public void writerIndex(int writerIndex) { @@ -695,19 +692,19 @@ class ReplayingDecoderBuffer implements ChannelBuffer { } private void checkIndex(int index) { - if (index > buffer.writerIndex()) { + if (index > buf().writerIndex()) { throw REPLAY; } } private void checkIndex(int index, int length) { - if (index + length > buffer.writerIndex()) { + if (index + length > buf().writerIndex()) { throw REPLAY; } } private void checkReadableBytes(int readableBytes) { - if (buffer.readableBytes() < readableBytes) { + if (buf().readableBytes() < readableBytes) { throw REPLAY; } }