Simplify FrameDecoder and ReplayingDecoder
This commit is contained in:
parent
7f21daed77
commit
1311a2edc1
@ -174,6 +174,10 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
|
|||||||
return indices[components.length];
|
return indices[components.length];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int numComponents() {
|
||||||
|
return components.length;
|
||||||
|
}
|
||||||
|
|
||||||
public byte getByte(int index) {
|
public byte getByte(int index) {
|
||||||
int componentId = componentId(index);
|
int componentId = componentId(index);
|
||||||
return components[componentId].getByte(index - indices[componentId]);
|
return components[componentId].getByte(index - indices[componentId]);
|
||||||
|
@ -250,79 +250,66 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implemen
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (cumulation == null) {
|
if (cumulation == null) {
|
||||||
// Wrap in try / finally.
|
|
||||||
//
|
|
||||||
// See https://github.com/netty/netty/issues/364
|
|
||||||
try {
|
try {
|
||||||
// the cumulation buffer is not created yet so just pass the input to callDecode(...) method
|
// the cumulation buffer is not created yet so just pass the input to callDecode(...) method
|
||||||
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
|
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
|
||||||
} finally {
|
} finally {
|
||||||
int readable = input.readableBytes();
|
updateCumulation(ctx, input);
|
||||||
|
}
|
||||||
|
|
||||||
if (readable > 0) {
|
} else {
|
||||||
int cap = input.capacity();
|
input = appendToCumulation(input);
|
||||||
|
try {
|
||||||
|
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
|
||||||
|
} finally {
|
||||||
|
updateCumulation(ctx, input);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// check if readableBytes == capacity we can safe the copy as we will not be able to
|
protected ChannelBuffer appendToCumulation(ChannelBuffer input) {
|
||||||
// optimize memory usage anyway
|
ChannelBuffer cumulation = this.cumulation;
|
||||||
if (readable != cap && cap > copyThreshold) {
|
assert cumulation.readable();
|
||||||
// seems like there is something readable left in the input buffer. So create
|
if (cumulation instanceof CompositeChannelBuffer) {
|
||||||
// the cumulation buffer and copy the input into it
|
// Make sure the resulting cumulation buffer has no more than 4 components.
|
||||||
cumulation = newCumulationBuffer(ctx, input.readableBytes());
|
CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation;
|
||||||
|
if (composite.numComponents() >= 4) {
|
||||||
|
cumulation = composite.copy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.cumulation = input = ChannelBuffers.wrappedBuffer(cumulation, input);
|
||||||
|
return input;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ChannelBuffer updateCumulation(ChannelHandlerContext ctx, ChannelBuffer input) {
|
||||||
|
ChannelBuffer newCumulation;
|
||||||
|
int readableBytes = input.readableBytes();
|
||||||
|
if (readableBytes > 0) {
|
||||||
|
int inputCapacity = input.capacity();
|
||||||
|
|
||||||
|
// If input.readableBytes() == input.capacity() (i.e. input is full),
|
||||||
|
// there's nothing to save from creating a new cumulation buffer
|
||||||
|
// even if input.capacity() exceeds the threshold, because the new cumulation
|
||||||
|
// buffer will have the same capacity and content with input.
|
||||||
|
if (readableBytes < inputCapacity && inputCapacity > copyThreshold) {
|
||||||
|
// At least one byte was consumed by callDecode() and input.capacity()
|
||||||
|
// exceeded the threshold.
|
||||||
|
cumulation = newCumulation = newCumulationBuffer(ctx, input.readableBytes());
|
||||||
cumulation.writeBytes(input);
|
cumulation.writeBytes(input);
|
||||||
} else {
|
} else {
|
||||||
// just use the input as cumulation buffer for now
|
// Nothing was consumed by callDecode() or input.capacity() did not
|
||||||
cumulation = input;
|
// exceed the threshold.
|
||||||
}
|
if (input.readerIndex() != 0) {
|
||||||
|
cumulation = newCumulation = input.slice();
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
assert cumulation.readable();
|
cumulation = newCumulation = input;
|
||||||
|
}
|
||||||
// wrap the cumulation and input
|
}
|
||||||
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(cumulation, input);
|
|
||||||
cumulation = buf;
|
|
||||||
|
|
||||||
// Wrap in try / finally.
|
|
||||||
//
|
|
||||||
// See https://github.com/netty/netty/issues/364
|
|
||||||
try {
|
|
||||||
callDecode(ctx, e.getChannel(), buf, e.getRemoteAddress());
|
|
||||||
} finally {
|
|
||||||
int readable = buf.readableBytes();
|
|
||||||
if (readable == 0) {
|
|
||||||
// nothing readable left so reset the state
|
|
||||||
cumulation = null;
|
|
||||||
} else {
|
} else {
|
||||||
int cap = buf.capacity();
|
cumulation = newCumulation = null;
|
||||||
|
|
||||||
if (readable != cap && cap > copyThreshold) {
|
|
||||||
// the readable bytes are > as the configured
|
|
||||||
// copyThreshold, so create a new buffer and copy the
|
|
||||||
// bytes into it
|
|
||||||
cumulation = newCumulationBuffer(ctx, buf.readableBytes());
|
|
||||||
cumulation.writeBytes(buf);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
if (readable == cap) {
|
|
||||||
cumulation = buf;
|
|
||||||
} else {
|
|
||||||
// create a new cumulation buffer that holds the
|
|
||||||
// unwrapped parts of the CompositeChannelBuffer
|
|
||||||
// that are not read yet.
|
|
||||||
cumulation = ChannelBuffers.wrappedBuffer(((CompositeChannelBuffer) buf)
|
|
||||||
.decompose(buf.readerIndex(), buf.readableBytes())
|
|
||||||
.toArray(new ChannelBuffer[0]));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
return newCumulation;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1,532 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2012 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.frame;
|
|
||||||
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.buffer.ChannelBufferFactory;
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
import org.jboss.netty.buffer.CompositeChannelBuffer;
|
|
||||||
import org.jboss.netty.channel.Channel;
|
|
||||||
import org.jboss.netty.channel.ChannelHandler;
|
|
||||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
|
||||||
import org.jboss.netty.channel.ChannelPipeline;
|
|
||||||
import org.jboss.netty.channel.ChannelStateEvent;
|
|
||||||
import org.jboss.netty.channel.ChannelUpstreamHandler;
|
|
||||||
import org.jboss.netty.channel.Channels;
|
|
||||||
import org.jboss.netty.channel.ExceptionEvent;
|
|
||||||
import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
|
|
||||||
import org.jboss.netty.channel.MessageEvent;
|
|
||||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
|
||||||
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Decodes the received {@link ChannelBuffer}s into a meaningful frame object.
|
|
||||||
* <p>
|
|
||||||
* In a stream-based transport such as TCP/IP, packets can be fragmented and
|
|
||||||
* reassembled during transmission even in a LAN environment. For example,
|
|
||||||
* let us assume you have received three packets:
|
|
||||||
* <pre>
|
|
||||||
* +-----+-----+-----+
|
|
||||||
* | ABC | DEF | GHI |
|
|
||||||
* +-----+-----+-----+
|
|
||||||
* </pre>
|
|
||||||
* because of the packet fragmentation, a server can receive them like the
|
|
||||||
* following:
|
|
||||||
* <pre>
|
|
||||||
* +----+-------+---+---+
|
|
||||||
* | AB | CDEFG | H | I |
|
|
||||||
* +----+-------+---+---+
|
|
||||||
* </pre>
|
|
||||||
* <p>
|
|
||||||
* {@link ZeroCopyFrameDecoder} helps you defrag the received packets into one or more
|
|
||||||
* meaningful <strong>frames</strong> that could be easily understood by the
|
|
||||||
* application logic. In case of the example above, your {@link ZeroCopyFrameDecoder}
|
|
||||||
* implementation could defrag the received packets like the following:
|
|
||||||
* <pre>
|
|
||||||
* +-----+-----+-----+
|
|
||||||
* | ABC | DEF | GHI |
|
|
||||||
* +-----+-----+-----+
|
|
||||||
* </pre>
|
|
||||||
* <p>
|
|
||||||
* The following code shows an example handler which decodes a frame whose
|
|
||||||
* first 4 bytes header represents the length of the frame, excluding the
|
|
||||||
* header.
|
|
||||||
* <pre>
|
|
||||||
* MESSAGE FORMAT
|
|
||||||
* ==============
|
|
||||||
*
|
|
||||||
* Offset: 0 4 (Length + 4)
|
|
||||||
* +--------+------------------------+
|
|
||||||
* Fields: | Length | Actual message content |
|
|
||||||
* +--------+------------------------+
|
|
||||||
*
|
|
||||||
* DECODER IMPLEMENTATION
|
|
||||||
* ======================
|
|
||||||
*
|
|
||||||
* public class IntegerHeaderFrameDecoder extends {@link ZeroCopyFrameDecoder} {
|
|
||||||
*
|
|
||||||
* {@code @Override}
|
|
||||||
* protected Object decode({@link ChannelHandlerContext} ctx,
|
|
||||||
* {@link Channel} channel,
|
|
||||||
* {@link ChannelBuffer} buf) throws Exception {
|
|
||||||
*
|
|
||||||
* // Make sure if the length field was received.
|
|
||||||
* if (buf.readableBytes() < 4) {
|
|
||||||
* // The length field was not received yet - return null.
|
|
||||||
* // This method will be invoked again when more packets are
|
|
||||||
* // received and appended to the buffer.
|
|
||||||
* return <strong>null</strong>;
|
|
||||||
* }
|
|
||||||
*
|
|
||||||
* // The length field is in the buffer.
|
|
||||||
*
|
|
||||||
* // Mark the current buffer position before reading the length field
|
|
||||||
* // because the whole frame might not be in the buffer yet.
|
|
||||||
* // We will reset the buffer position to the marked position if
|
|
||||||
* // there's not enough bytes in the buffer.
|
|
||||||
* buf.markReaderIndex();
|
|
||||||
*
|
|
||||||
* // Read the length field.
|
|
||||||
* int length = buf.readInt();
|
|
||||||
*
|
|
||||||
* // Make sure if there's enough bytes in the buffer.
|
|
||||||
* if (buf.readableBytes() < length) {
|
|
||||||
* // The whole bytes were not received yet - return null.
|
|
||||||
* // This method will be invoked again when more packets are
|
|
||||||
* // received and appended to the buffer.
|
|
||||||
*
|
|
||||||
* // Reset to the marked position to read the length field again
|
|
||||||
* // next time.
|
|
||||||
* buf.resetReaderIndex();
|
|
||||||
*
|
|
||||||
* return <strong>null</strong>;
|
|
||||||
* }
|
|
||||||
*
|
|
||||||
* // There's enough bytes in the buffer. Read it.
|
|
||||||
* {@link ChannelBuffer} frame = buf.readBytes(length);
|
|
||||||
*
|
|
||||||
* // Successfully decoded a frame. Return the decoded frame.
|
|
||||||
* return <strong>frame</strong>;
|
|
||||||
* }
|
|
||||||
* }
|
|
||||||
* </pre>
|
|
||||||
*
|
|
||||||
* <h3>Returning a POJO rather than a {@link ChannelBuffer}</h3>
|
|
||||||
* <p>
|
|
||||||
* Please note that you can return an object of a different type than
|
|
||||||
* {@link ChannelBuffer} in your {@code decode()} and {@code decodeLast()}
|
|
||||||
* implementation. For example, you could return a
|
|
||||||
* <a href="http://en.wikipedia.org/wiki/POJO">POJO</a> so that the next
|
|
||||||
* {@link ChannelUpstreamHandler} receives a {@link MessageEvent} which
|
|
||||||
* contains a POJO rather than a {@link ChannelBuffer}.
|
|
||||||
*
|
|
||||||
* <h3>Replacing a decoder with another decoder in a pipeline</h3>
|
|
||||||
* <p>
|
|
||||||
* If you are going to write a protocol multiplexer, you will probably want to
|
|
||||||
* replace a {@link ZeroCopyFrameDecoder} (protocol detector) with another
|
|
||||||
* {@link ZeroCopyFrameDecoder} or {@link ReplayingDecoder} (actual protocol decoder).
|
|
||||||
* It is not possible to achieve this simply by calling
|
|
||||||
* {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
|
|
||||||
* some additional steps are required:
|
|
||||||
* <pre>
|
|
||||||
* public class FirstDecoder extends {@link ZeroCopyFrameDecoder} {
|
|
||||||
*
|
|
||||||
* public FirstDecoder() {
|
|
||||||
* super(true); // Enable unfold
|
|
||||||
* }
|
|
||||||
*
|
|
||||||
* {@code @Override}
|
|
||||||
* protected Object decode({@link ChannelHandlerContext} ctx,
|
|
||||||
* {@link Channel} channel,
|
|
||||||
* {@link ChannelBuffer} buf) {
|
|
||||||
* ...
|
|
||||||
* // Decode the first message
|
|
||||||
* Object firstMessage = ...;
|
|
||||||
*
|
|
||||||
* // Add the second decoder
|
|
||||||
* ctx.getPipeline().addLast("second", new SecondDecoder());
|
|
||||||
*
|
|
||||||
* // Remove the first decoder (me)
|
|
||||||
* ctx.getPipeline().remove(this);
|
|
||||||
*
|
|
||||||
* if (buf.readable()) {
|
|
||||||
* // Hand off the remaining data to the second decoder
|
|
||||||
* return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) };
|
|
||||||
* } else {
|
|
||||||
* // Nothing to hand off
|
|
||||||
* return firstMessage;
|
|
||||||
* }
|
|
||||||
* }
|
|
||||||
* }
|
|
||||||
* </pre>
|
|
||||||
*
|
|
||||||
* @apiviz.landmark
|
|
||||||
*/
|
|
||||||
public abstract class ZeroCopyFrameDecoder
|
|
||||||
extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
|
|
||||||
|
|
||||||
private final boolean unfold;
|
|
||||||
protected List<ChannelBuffer> cumulation;
|
|
||||||
private volatile ChannelHandlerContext ctx;
|
|
||||||
private int copyThreshold;
|
|
||||||
|
|
||||||
protected ZeroCopyFrameDecoder() {
|
|
||||||
this(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected ZeroCopyFrameDecoder(boolean unfold) {
|
|
||||||
this.unfold = unfold;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the maximal unused capacity of the internal cumulation ChannelBuffer
|
|
||||||
* before the {@link ZeroCopyFrameDecoder} tries to minimize the memory usage by
|
|
||||||
* "byte copy".
|
|
||||||
*
|
|
||||||
*
|
|
||||||
* What you use here really depends on your application and need. Using
|
|
||||||
* {@link Integer#MAX_VALUE} will disable all byte copies but give you the
|
|
||||||
* cost of a higher memory usage if big {@link ChannelBuffer}'s will be
|
|
||||||
* received.
|
|
||||||
*
|
|
||||||
* By default a threshold of <code>0</code> is used, which means it will
|
|
||||||
* always copy to try to reduce memory usage
|
|
||||||
*
|
|
||||||
*
|
|
||||||
* @param copyThreshold
|
|
||||||
* the threshold (in bytes) or {@link Integer#MAX_VALUE} to
|
|
||||||
* disable it. The value must be at least 0
|
|
||||||
* @throws IllegalStateException
|
|
||||||
* get thrown if someone tries to change this setting after the
|
|
||||||
* Decoder was added to the {@link ChannelPipeline}
|
|
||||||
*/
|
|
||||||
public final void setMaxUnusedBufferCapacity(int copyThreshold) {
|
|
||||||
if (copyThreshold < 0) {
|
|
||||||
throw new IllegalArgumentException("MaxUnusedBufferCapacity must be >= 0");
|
|
||||||
}
|
|
||||||
if (ctx == null) {
|
|
||||||
this.copyThreshold = copyThreshold;
|
|
||||||
} else {
|
|
||||||
throw new IllegalStateException("MaxWastedBufferCapacity " +
|
|
||||||
"can only be changed before the Decoder was added to the ChannelPipeline");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a compact slice of this buffer's readable bytes.
|
|
||||||
*
|
|
||||||
* The returned buffer may or may not share the content area with the buffer
|
|
||||||
* given as an argument while they maintain separate indexes and marks.
|
|
||||||
* If more than the maximal unused buffer capacity is unused then the
|
|
||||||
* content is copied to a new buffer to conserve memory.
|
|
||||||
*
|
|
||||||
* @param buffer ChannelBuffer to compact
|
|
||||||
* @return a compact slice of buffer
|
|
||||||
*/
|
|
||||||
private ChannelBuffer compactBuffer(ChannelBuffer buffer) {
|
|
||||||
if (buffer.capacity() - buffer.readableBytes() > copyThreshold) {
|
|
||||||
ChannelBuffer copy = newCumulationBuffer(ctx, buffer.readableBytes());
|
|
||||||
copy.writeBytes(buffer);
|
|
||||||
return copy;
|
|
||||||
} else {
|
|
||||||
return buffer.slice();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void messageReceived(
|
|
||||||
ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
|
||||||
|
|
||||||
Object m = e.getMessage();
|
|
||||||
if (!(m instanceof ChannelBuffer)) {
|
|
||||||
ctx.sendUpstream(e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
ChannelBuffer input = (ChannelBuffer) m;
|
|
||||||
if (!input.readable()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cumulation == null) {
|
|
||||||
// Wrap in try / finally.
|
|
||||||
//
|
|
||||||
// See https://github.com/netty/netty/issues/364
|
|
||||||
try {
|
|
||||||
// the cumulation buffer is not created yet so just pass the input to callDecode(...) method
|
|
||||||
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
|
|
||||||
} finally {
|
|
||||||
if (input.readable()) {
|
|
||||||
// unread data is left so create a cumulation buffer
|
|
||||||
cumulation = new ArrayList<ChannelBuffer>();
|
|
||||||
cumulation.add(compactBuffer(input));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
cumulation.add(compactBuffer(input));
|
|
||||||
|
|
||||||
CompositeChannelBuffer buf =
|
|
||||||
new CompositeChannelBuffer(cumulation.get(0).order(), cumulation, false);
|
|
||||||
|
|
||||||
// Wrap in try / finally.
|
|
||||||
//
|
|
||||||
// See https://github.com/netty/netty/issues/364
|
|
||||||
try {
|
|
||||||
callDecode(ctx, e.getChannel(), buf, e.getRemoteAddress());
|
|
||||||
} finally {
|
|
||||||
if (!buf.readable()) {
|
|
||||||
// nothing readable left so reset the state
|
|
||||||
cumulation = null;
|
|
||||||
} else if (buf.readableBytes() != buf.capacity()) {
|
|
||||||
// part of the buffer was read, but not all
|
|
||||||
int read = buf.capacity() - buf.readableBytes();
|
|
||||||
|
|
||||||
// get rid of fully read leading buffers
|
|
||||||
int i = 0;
|
|
||||||
while (read >= cumulation.get(i).readableBytes()) {
|
|
||||||
read -= cumulation.get(i).readableBytes();
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
cumulation.subList(0, i).clear();
|
|
||||||
|
|
||||||
// compact partially read leading buffer
|
|
||||||
if (read > 0) {
|
|
||||||
ChannelBuffer first = cumulation.get(0);
|
|
||||||
first.readerIndex(read);
|
|
||||||
cumulation.set(0, compactBuffer(first));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelDisconnected(
|
|
||||||
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
|
||||||
cleanup(ctx, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelClosed(
|
|
||||||
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
|
||||||
cleanup(ctx, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void exceptionCaught(
|
|
||||||
ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
|
|
||||||
ctx.sendUpstream(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Decodes the received packets so far into a frame.
|
|
||||||
*
|
|
||||||
* @param ctx the context of this handler
|
|
||||||
* @param channel the current channel
|
|
||||||
* @param buffer the cumulative buffer of received packets so far.
|
|
||||||
* Note that the buffer might be empty, which means you
|
|
||||||
* should not make an assumption that the buffer contains
|
|
||||||
* at least one byte in your decoder implementation.
|
|
||||||
*
|
|
||||||
* @return the decoded frame if a full frame was received and decoded.
|
|
||||||
* {@code null} if there's not enough data in the buffer to decode a frame.
|
|
||||||
*/
|
|
||||||
protected abstract Object decode(
|
|
||||||
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Decodes the received data so far into a frame when the channel is
|
|
||||||
* disconnected.
|
|
||||||
*
|
|
||||||
* @param ctx the context of this handler
|
|
||||||
* @param channel the current channel
|
|
||||||
* @param buffer the cumulative buffer of received packets so far.
|
|
||||||
* Note that the buffer might be empty, which means you
|
|
||||||
* should not make an assumption that the buffer contains
|
|
||||||
* at least one byte in your decoder implementation.
|
|
||||||
*
|
|
||||||
* @return the decoded frame if a full frame was received and decoded.
|
|
||||||
* {@code null} if there's not enough data in the buffer to decode a frame.
|
|
||||||
*/
|
|
||||||
protected Object decodeLast(
|
|
||||||
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
|
||||||
return decode(ctx, channel, buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void callDecode(
|
|
||||||
ChannelHandlerContext context, Channel channel,
|
|
||||||
ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
|
|
||||||
|
|
||||||
while (cumulation.readable()) {
|
|
||||||
int oldReaderIndex = cumulation.readerIndex();
|
|
||||||
Object frame = decode(context, channel, cumulation);
|
|
||||||
if (frame == null) {
|
|
||||||
if (oldReaderIndex == cumulation.readerIndex()) {
|
|
||||||
// Seems like more data is required.
|
|
||||||
// Let us wait for the next notification.
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
// Previous data has been discarded.
|
|
||||||
// Probably it is reading on.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
} else if (oldReaderIndex == cumulation.readerIndex()) {
|
|
||||||
throw new IllegalStateException(
|
|
||||||
"decode() method must read at least one byte " +
|
|
||||||
"if it returned a frame (caused by: " + getClass() + ")");
|
|
||||||
}
|
|
||||||
|
|
||||||
unfoldAndFireMessageReceived(context, remoteAddress, frame);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected final void unfoldAndFireMessageReceived(
|
|
||||||
ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
|
|
||||||
if (unfold) {
|
|
||||||
if (result instanceof Object[]) {
|
|
||||||
for (Object r: (Object[]) result) {
|
|
||||||
Channels.fireMessageReceived(context, r, remoteAddress);
|
|
||||||
}
|
|
||||||
} else if (result instanceof Iterable<?>) {
|
|
||||||
for (Object r: (Iterable<?>) result) {
|
|
||||||
Channels.fireMessageReceived(context, r, remoteAddress);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Channels.fireMessageReceived(context, result, remoteAddress);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Channels.fireMessageReceived(context, result, remoteAddress);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets called on {@link #channelDisconnected(ChannelHandlerContext, ChannelStateEvent)} and
|
|
||||||
* {@link #channelClosed(ChannelHandlerContext, ChannelStateEvent)}
|
|
||||||
*/
|
|
||||||
protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
|
|
||||||
throws Exception {
|
|
||||||
try {
|
|
||||||
List<ChannelBuffer> cumulation = this.cumulation;
|
|
||||||
if (cumulation == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.cumulation = null;
|
|
||||||
|
|
||||||
CompositeChannelBuffer buf =
|
|
||||||
new CompositeChannelBuffer(cumulation.get(0).order(), cumulation, false);
|
|
||||||
|
|
||||||
// Make sure all frames are read before notifying a closed channel.
|
|
||||||
callDecode(ctx, ctx.getChannel(), buf, null);
|
|
||||||
|
|
||||||
// Call decodeLast() finally. Please note that decodeLast() is
|
|
||||||
// called even if there's nothing more to read from the buffer to
|
|
||||||
// notify a user that the connection was closed explicitly.
|
|
||||||
Object partialFrame = decodeLast(ctx, ctx.getChannel(), buf);
|
|
||||||
if (partialFrame != null) {
|
|
||||||
unfoldAndFireMessageReceived(ctx, null, partialFrame);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
ctx.sendUpstream(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new {@link ChannelBuffer} which is used for the cumulation.
|
|
||||||
* Sub-classes may override this.
|
|
||||||
*
|
|
||||||
* @param ctx {@link ChannelHandlerContext} for this handler
|
|
||||||
* @return buffer the {@link ChannelBuffer} which is used for cumulation
|
|
||||||
*/
|
|
||||||
protected ChannelBuffer newCumulationBuffer(
|
|
||||||
ChannelHandlerContext ctx, int minimumCapacity) {
|
|
||||||
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
|
|
||||||
return factory.getBuffer(minimumCapacity);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Replace this {@link ZeroCopyFrameDecoder} in the {@link ChannelPipeline} with the given {@link ChannelHandler}.
|
|
||||||
* All remaining bytes in the {@link ChannelBuffer} will get send to the new {@link ChannelHandler} that was used
|
|
||||||
* as replacement
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public void replace(String handlerName, ChannelHandler handler) {
|
|
||||||
if (ctx == null) {
|
|
||||||
throw new IllegalStateException(
|
|
||||||
"Replace cann only be called once the FrameDecoder is added to the ChannelPipeline");
|
|
||||||
}
|
|
||||||
ChannelPipeline pipeline = ctx.getPipeline();
|
|
||||||
pipeline.addAfter(ctx.getName(), handlerName, handler);
|
|
||||||
|
|
||||||
try {
|
|
||||||
if (cumulation != null) {
|
|
||||||
CompositeChannelBuffer buf =
|
|
||||||
new CompositeChannelBuffer(cumulation.get(0).order(), cumulation, false);
|
|
||||||
Channels.fireMessageReceived(ctx, buf);
|
|
||||||
cumulation = null;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
pipeline.remove(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the actual number of readable bytes in the internal cumulative
|
|
||||||
* buffer of this decoder. You usually do not need to rely on this value
|
|
||||||
* to write a decoder. Use it only when you muse use it at your own risk.
|
|
||||||
* This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
|
|
||||||
*/
|
|
||||||
protected int actualReadableBytes() {
|
|
||||||
return internalBuffer().readableBytes();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the internal cumulative buffer of this decoder. You usually
|
|
||||||
* do not need to access the internal buffer directly to write a decoder.
|
|
||||||
* Use it only when you must use it at your own risk.
|
|
||||||
*/
|
|
||||||
protected ChannelBuffer internalBuffer() {
|
|
||||||
List<ChannelBuffer> buf = cumulation;
|
|
||||||
if (buf == null) {
|
|
||||||
return ChannelBuffers.EMPTY_BUFFER;
|
|
||||||
}
|
|
||||||
return new CompositeChannelBuffer(cumulation.get(0).order(), cumulation, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
this.ctx = ctx;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
// Nothing to do..
|
|
||||||
}
|
|
||||||
|
|
||||||
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
// Nothing to do..
|
|
||||||
}
|
|
||||||
|
|
||||||
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
// Nothing to do..
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -18,8 +18,6 @@ package org.jboss.netty.handler.codec.replay;
|
|||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
import org.jboss.netty.buffer.CompositeChannelBuffer;
|
|
||||||
import org.jboss.netty.channel.Channel;
|
import org.jboss.netty.channel.Channel;
|
||||||
import org.jboss.netty.channel.ChannelHandler;
|
import org.jboss.netty.channel.ChannelHandler;
|
||||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
@ -286,7 +284,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
extends FrameDecoder {
|
extends FrameDecoder {
|
||||||
|
|
||||||
|
|
||||||
private ReplayingDecoderBuffer replayable;
|
private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(this);
|
||||||
private T state;
|
private T state;
|
||||||
private int checkpoint;
|
private int checkpoint;
|
||||||
private boolean needsCleanup;
|
private boolean needsCleanup;
|
||||||
@ -315,6 +313,11 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
this.state = initialState;
|
this.state = initialState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ChannelBuffer internalBuffer() {
|
||||||
|
return super.internalBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stores the internal cumulative buffer's reader position.
|
* Stores the internal cumulative buffer's reader position.
|
||||||
*/
|
*/
|
||||||
@ -426,30 +429,24 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
// the cumulation buffer is not created yet so just pass the input
|
// the cumulation buffer is not created yet so just pass the input
|
||||||
// to callDecode(...) method
|
// to callDecode(...) method
|
||||||
cumulation = input;
|
cumulation = input;
|
||||||
replayable = new ReplayingDecoderBuffer(input);
|
|
||||||
|
|
||||||
int oldReaderIndex = input.readerIndex();
|
int oldReaderIndex = input.readerIndex();
|
||||||
int inputSize = input.readableBytes();
|
int inputSize = input.readableBytes();
|
||||||
|
|
||||||
// Wrap in try / finally.
|
|
||||||
//
|
|
||||||
// See https://github.com/netty/netty/issues/364
|
|
||||||
try {
|
try {
|
||||||
callDecode(
|
callDecode(
|
||||||
ctx, e.getChannel(),
|
ctx, e.getChannel(),
|
||||||
input, replayable,
|
input, replayable,
|
||||||
e.getRemoteAddress());
|
e.getRemoteAddress());
|
||||||
} finally {
|
} finally {
|
||||||
int readable = input.readableBytes();
|
int readableBytes = input.readableBytes();
|
||||||
|
if (readableBytes > 0) {
|
||||||
if (readable > 0) {
|
int inputCapacity = input.capacity();
|
||||||
int cap = input.capacity();
|
|
||||||
boolean copy = false;
|
|
||||||
// check if readableBytes == capacity we can safe the copy as we will not be able to
|
// check if readableBytes == capacity we can safe the copy as we will not be able to
|
||||||
// optimize memory usage anyway
|
// optimize memory usage anyway
|
||||||
if (readable != cap && cap > getMaxCumulationBufferCapacity()) {
|
boolean copy =
|
||||||
copy = true;
|
readableBytes != inputCapacity &&
|
||||||
}
|
inputCapacity > getMaxCumulationBufferCapacity();
|
||||||
|
|
||||||
// seems like there is something readable left in the input buffer
|
// seems like there is something readable left in the input buffer
|
||||||
// or decoder wants a replay - create the cumulation buffer and
|
// or decoder wants a replay - create the cumulation buffer and
|
||||||
@ -458,86 +455,39 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
if (checkpoint > 0) {
|
if (checkpoint > 0) {
|
||||||
int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex);
|
int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex);
|
||||||
if (copy) {
|
if (copy) {
|
||||||
cumulation = this.cumulation =
|
this.cumulation = cumulation = newCumulationBuffer(ctx, bytesToPreserve);
|
||||||
newCumulationBuffer(ctx, bytesToPreserve);
|
|
||||||
cumulation.writeBytes(input, checkpoint, bytesToPreserve);
|
cumulation.writeBytes(input, checkpoint, bytesToPreserve);
|
||||||
} else {
|
} else {
|
||||||
cumulation = this.cumulation =
|
this.cumulation = cumulation = input.slice(checkpoint, bytesToPreserve);
|
||||||
input.slice(checkpoint, bytesToPreserve);
|
|
||||||
}
|
}
|
||||||
} else if (checkpoint == 0) {
|
} else if (checkpoint == 0) {
|
||||||
if (copy) {
|
if (copy) {
|
||||||
cumulation = this.cumulation =
|
this.cumulation = cumulation = newCumulationBuffer(ctx, inputSize);
|
||||||
newCumulationBuffer(ctx, inputSize);
|
|
||||||
cumulation.writeBytes(input, oldReaderIndex, inputSize);
|
cumulation.writeBytes(input, oldReaderIndex, inputSize);
|
||||||
cumulation.readerIndex(input.readerIndex());
|
cumulation.readerIndex(input.readerIndex());
|
||||||
} else {
|
} else {
|
||||||
cumulation = this.cumulation =
|
this.cumulation = cumulation = input.slice(oldReaderIndex, inputSize);
|
||||||
input.slice(oldReaderIndex, inputSize);
|
|
||||||
cumulation.readerIndex(input.readerIndex());
|
cumulation.readerIndex(input.readerIndex());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (copy) {
|
if (copy) {
|
||||||
cumulation = this.cumulation =
|
this.cumulation = cumulation = newCumulationBuffer(ctx, input.readableBytes());
|
||||||
newCumulationBuffer(ctx, input.readableBytes());
|
|
||||||
cumulation.writeBytes(input);
|
cumulation.writeBytes(input);
|
||||||
} else {
|
} else {
|
||||||
cumulation = this.cumulation =
|
this.cumulation = cumulation = input;
|
||||||
input;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
replayable = new ReplayingDecoderBuffer(cumulation);
|
|
||||||
} else {
|
} else {
|
||||||
cumulation = null;
|
cumulation = null;
|
||||||
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
assert cumulation.readable();
|
input = appendToCumulation(input);
|
||||||
// wrap the cumulation and input
|
|
||||||
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(cumulation, input);
|
|
||||||
cumulation = buf;
|
|
||||||
replayable = new ReplayingDecoderBuffer(cumulation);
|
|
||||||
|
|
||||||
// Wrap in try / finally.
|
|
||||||
//
|
|
||||||
// See https://github.com/netty/netty/issues/364
|
|
||||||
try {
|
try {
|
||||||
callDecode(ctx, e.getChannel(), buf, replayable, e.getRemoteAddress());
|
callDecode(ctx, e.getChannel(), input, replayable, e.getRemoteAddress());
|
||||||
} finally {
|
} finally {
|
||||||
int readable = buf.readableBytes();
|
updateCumulation(ctx, input);
|
||||||
if (readable == 0) {
|
|
||||||
// nothing readable left so reset the state
|
|
||||||
cumulation = null;
|
|
||||||
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
|
|
||||||
} else {
|
|
||||||
int cap = buf.capacity();
|
|
||||||
|
|
||||||
if (readable != cap && cap > getMaxCumulationBufferCapacity()) {
|
|
||||||
// the readable bytes are > as the configured
|
|
||||||
// copyThreshold, so create a new buffer and copy the
|
|
||||||
// bytes into it
|
|
||||||
cumulation = newCumulationBuffer(ctx, buf.readableBytes());
|
|
||||||
cumulation.writeBytes(buf);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
if (readable == cap) {
|
|
||||||
cumulation = buf;
|
|
||||||
} else {
|
|
||||||
// create a new cumulation buffer that holds the
|
|
||||||
// unwrapped parts of the CompositeChannelBuffer
|
|
||||||
// that are not read yet.
|
|
||||||
cumulation = ChannelBuffers.wrappedBuffer(((CompositeChannelBuffer) buf)
|
|
||||||
.decompose(buf.readerIndex(), buf.readableBytes())
|
|
||||||
.toArray(new ChannelBuffer[0]));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
replayable = new ReplayingDecoderBuffer(cumulation);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -600,7 +550,6 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
needsCleanup = false;
|
needsCleanup = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.cumulation = null;
|
|
||||||
replayable.terminate();
|
replayable.terminate();
|
||||||
|
|
||||||
if (cumulation != null && cumulation.readable()) {
|
if (cumulation != null && cumulation.readable()) {
|
||||||
@ -612,15 +561,16 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
// called even if there's nothing more to read from the buffer to
|
// called even if there's nothing more to read from the buffer to
|
||||||
// notify a user that the connection was closed explicitly.
|
// notify a user that the connection was closed explicitly.
|
||||||
Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state);
|
Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state);
|
||||||
|
|
||||||
|
this.cumulation = null;
|
||||||
|
|
||||||
if (partiallyDecoded != null) {
|
if (partiallyDecoded != null) {
|
||||||
unfoldAndFireMessageReceived(ctx, null, partiallyDecoded);
|
unfoldAndFireMessageReceived(ctx, null, partiallyDecoded);
|
||||||
}
|
}
|
||||||
} catch (ReplayError replay) {
|
} catch (ReplayError replay) {
|
||||||
// Ignore
|
// Ignore
|
||||||
} finally {
|
} finally {
|
||||||
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
|
|
||||||
ctx.sendUpstream(e);
|
ctx.sendUpstream(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -27,23 +27,20 @@ import java.nio.charset.Charset;
|
|||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
import org.jboss.netty.buffer.ChannelBufferFactory;
|
import org.jboss.netty.buffer.ChannelBufferFactory;
|
||||||
import org.jboss.netty.buffer.ChannelBufferIndexFinder;
|
import org.jboss.netty.buffer.ChannelBufferIndexFinder;
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
|
|
||||||
class ReplayingDecoderBuffer implements ChannelBuffer {
|
class ReplayingDecoderBuffer implements ChannelBuffer {
|
||||||
|
|
||||||
private static final Error REPLAY = new ReplayError();
|
private static final Error REPLAY = new ReplayError();
|
||||||
|
|
||||||
private final ChannelBuffer buffer;
|
private final ReplayingDecoder<?> parent;
|
||||||
private boolean terminated;
|
private boolean terminated;
|
||||||
|
|
||||||
public static ReplayingDecoderBuffer EMPTY_BUFFER = new ReplayingDecoderBuffer(ChannelBuffers.EMPTY_BUFFER);
|
ReplayingDecoderBuffer(ReplayingDecoder<?> parent) {
|
||||||
|
this.parent = parent;
|
||||||
static {
|
|
||||||
EMPTY_BUFFER.terminate();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ReplayingDecoderBuffer(ChannelBuffer buffer) {
|
private ChannelBuffer buf() {
|
||||||
this.buffer = buffer;
|
return parent.internalBuffer();
|
||||||
}
|
}
|
||||||
|
|
||||||
void terminate() {
|
void terminate() {
|
||||||
@ -52,14 +49,14 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public int capacity() {
|
public int capacity() {
|
||||||
if (terminated) {
|
if (terminated) {
|
||||||
return buffer.capacity();
|
return buf().capacity();
|
||||||
} else {
|
} else {
|
||||||
return Integer.MAX_VALUE;
|
return Integer.MAX_VALUE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isDirect() {
|
public boolean isDirect() {
|
||||||
return buffer.isDirect();
|
return buf().isDirect();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasArray() {
|
public boolean hasArray() {
|
||||||
@ -93,7 +90,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public ChannelBuffer copy(int index, int length) {
|
public ChannelBuffer copy(int index, int length) {
|
||||||
checkIndex(index, length);
|
checkIndex(index, length);
|
||||||
return buffer.copy(index, length);
|
return buf().copy(index, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void discardReadBytes() {
|
public void discardReadBytes() {
|
||||||
@ -110,22 +107,22 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public byte getByte(int index) {
|
public byte getByte(int index) {
|
||||||
checkIndex(index);
|
checkIndex(index);
|
||||||
return buffer.getByte(index);
|
return buf().getByte(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
public short getUnsignedByte(int index) {
|
public short getUnsignedByte(int index) {
|
||||||
checkIndex(index);
|
checkIndex(index);
|
||||||
return buffer.getUnsignedByte(index);
|
return buf().getUnsignedByte(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void getBytes(int index, byte[] dst, int dstIndex, int length) {
|
public void getBytes(int index, byte[] dst, int dstIndex, int length) {
|
||||||
checkIndex(index, length);
|
checkIndex(index, length);
|
||||||
buffer.getBytes(index, dst, dstIndex, length);
|
buf().getBytes(index, dst, dstIndex, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void getBytes(int index, byte[] dst) {
|
public void getBytes(int index, byte[] dst) {
|
||||||
checkIndex(index, dst.length);
|
checkIndex(index, dst.length);
|
||||||
buffer.getBytes(index, dst);
|
buf().getBytes(index, dst);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void getBytes(int index, ByteBuffer dst) {
|
public void getBytes(int index, ByteBuffer dst) {
|
||||||
@ -134,7 +131,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public void getBytes(int index, ChannelBuffer dst, int dstIndex, int length) {
|
public void getBytes(int index, ChannelBuffer dst, int dstIndex, int length) {
|
||||||
checkIndex(index, length);
|
checkIndex(index, length);
|
||||||
buffer.getBytes(index, dst, dstIndex, length);
|
buf().getBytes(index, dst, dstIndex, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void getBytes(int index, ChannelBuffer dst, int length) {
|
public void getBytes(int index, ChannelBuffer dst, int length) {
|
||||||
@ -157,52 +154,52 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public int getInt(int index) {
|
public int getInt(int index) {
|
||||||
checkIndex(index, 4);
|
checkIndex(index, 4);
|
||||||
return buffer.getInt(index);
|
return buf().getInt(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getUnsignedInt(int index) {
|
public long getUnsignedInt(int index) {
|
||||||
checkIndex(index, 4);
|
checkIndex(index, 4);
|
||||||
return buffer.getUnsignedInt(index);
|
return buf().getUnsignedInt(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getLong(int index) {
|
public long getLong(int index) {
|
||||||
checkIndex(index, 8);
|
checkIndex(index, 8);
|
||||||
return buffer.getLong(index);
|
return buf().getLong(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getMedium(int index) {
|
public int getMedium(int index) {
|
||||||
checkIndex(index, 3);
|
checkIndex(index, 3);
|
||||||
return buffer.getMedium(index);
|
return buf().getMedium(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getUnsignedMedium(int index) {
|
public int getUnsignedMedium(int index) {
|
||||||
checkIndex(index, 3);
|
checkIndex(index, 3);
|
||||||
return buffer.getUnsignedMedium(index);
|
return buf().getUnsignedMedium(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
public short getShort(int index) {
|
public short getShort(int index) {
|
||||||
checkIndex(index, 2);
|
checkIndex(index, 2);
|
||||||
return buffer.getShort(index);
|
return buf().getShort(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getUnsignedShort(int index) {
|
public int getUnsignedShort(int index) {
|
||||||
checkIndex(index, 2);
|
checkIndex(index, 2);
|
||||||
return buffer.getUnsignedShort(index);
|
return buf().getUnsignedShort(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
public char getChar(int index) {
|
public char getChar(int index) {
|
||||||
checkIndex(index, 2);
|
checkIndex(index, 2);
|
||||||
return buffer.getChar(index);
|
return buf().getChar(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
public float getFloat(int index) {
|
public float getFloat(int index) {
|
||||||
checkIndex(index, 4);
|
checkIndex(index, 4);
|
||||||
return buffer.getFloat(index);
|
return buf().getFloat(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
public double getDouble(int index) {
|
public double getDouble(int index) {
|
||||||
checkIndex(index, 8);
|
checkIndex(index, 8);
|
||||||
return buffer.getDouble(index);
|
return buf().getDouble(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -211,7 +208,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public int indexOf(int fromIndex, int toIndex, byte value) {
|
public int indexOf(int fromIndex, int toIndex, byte value) {
|
||||||
int endIndex = buffer.indexOf(fromIndex, toIndex, value);
|
int endIndex = buf().indexOf(fromIndex, toIndex, value);
|
||||||
if (endIndex < 0) {
|
if (endIndex < 0) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
@ -220,7 +217,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public int indexOf(int fromIndex, int toIndex,
|
public int indexOf(int fromIndex, int toIndex,
|
||||||
ChannelBufferIndexFinder indexFinder) {
|
ChannelBufferIndexFinder indexFinder) {
|
||||||
int endIndex = buffer.indexOf(fromIndex, toIndex, indexFinder);
|
int endIndex = buf().indexOf(fromIndex, toIndex, indexFinder);
|
||||||
if (endIndex < 0) {
|
if (endIndex < 0) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
@ -228,7 +225,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public int bytesBefore(byte value) {
|
public int bytesBefore(byte value) {
|
||||||
int bytes = buffer.bytesBefore(value);
|
int bytes = buf().bytesBefore(value);
|
||||||
if (bytes < 0) {
|
if (bytes < 0) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
@ -236,7 +233,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public int bytesBefore(ChannelBufferIndexFinder indexFinder) {
|
public int bytesBefore(ChannelBufferIndexFinder indexFinder) {
|
||||||
int bytes = buffer.bytesBefore(indexFinder);
|
int bytes = buf().bytesBefore(indexFinder);
|
||||||
if (bytes < 0) {
|
if (bytes < 0) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
@ -245,7 +242,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public int bytesBefore(int length, byte value) {
|
public int bytesBefore(int length, byte value) {
|
||||||
checkReadableBytes(length);
|
checkReadableBytes(length);
|
||||||
int bytes = buffer.bytesBefore(length, value);
|
int bytes = buf().bytesBefore(length, value);
|
||||||
if (bytes < 0) {
|
if (bytes < 0) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
@ -254,7 +251,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public int bytesBefore(int length, ChannelBufferIndexFinder indexFinder) {
|
public int bytesBefore(int length, ChannelBufferIndexFinder indexFinder) {
|
||||||
checkReadableBytes(length);
|
checkReadableBytes(length);
|
||||||
int bytes = buffer.bytesBefore(length, indexFinder);
|
int bytes = buf().bytesBefore(length, indexFinder);
|
||||||
if (bytes < 0) {
|
if (bytes < 0) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
@ -262,7 +259,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public int bytesBefore(int index, int length, byte value) {
|
public int bytesBefore(int index, int length, byte value) {
|
||||||
int bytes = buffer.bytesBefore(index, length, value);
|
int bytes = buf().bytesBefore(index, length, value);
|
||||||
if (bytes < 0) {
|
if (bytes < 0) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
@ -271,7 +268,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public int bytesBefore(int index, int length,
|
public int bytesBefore(int index, int length,
|
||||||
ChannelBufferIndexFinder indexFinder) {
|
ChannelBufferIndexFinder indexFinder) {
|
||||||
int bytes = buffer.bytesBefore(index, length, indexFinder);
|
int bytes = buf().bytesBefore(index, length, indexFinder);
|
||||||
if (bytes < 0) {
|
if (bytes < 0) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
@ -279,7 +276,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void markReaderIndex() {
|
public void markReaderIndex() {
|
||||||
buffer.markReaderIndex();
|
buf().markReaderIndex();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void markWriterIndex() {
|
public void markWriterIndex() {
|
||||||
@ -287,43 +284,43 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public ChannelBufferFactory factory() {
|
public ChannelBufferFactory factory() {
|
||||||
return buffer.factory();
|
return buf().factory();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteOrder order() {
|
public ByteOrder order() {
|
||||||
return buffer.order();
|
return buf().order();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean readable() {
|
public boolean readable() {
|
||||||
return terminated? buffer.readable() : true;
|
return terminated? buf().readable() : true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int readableBytes() {
|
public int readableBytes() {
|
||||||
if (terminated) {
|
if (terminated) {
|
||||||
return buffer.readableBytes();
|
return buf().readableBytes();
|
||||||
} else {
|
} else {
|
||||||
return Integer.MAX_VALUE - buffer.readerIndex();
|
return Integer.MAX_VALUE - buf().readerIndex();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public byte readByte() {
|
public byte readByte() {
|
||||||
checkReadableBytes(1);
|
checkReadableBytes(1);
|
||||||
return buffer.readByte();
|
return buf().readByte();
|
||||||
}
|
}
|
||||||
|
|
||||||
public short readUnsignedByte() {
|
public short readUnsignedByte() {
|
||||||
checkReadableBytes(1);
|
checkReadableBytes(1);
|
||||||
return buffer.readUnsignedByte();
|
return buf().readUnsignedByte();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void readBytes(byte[] dst, int dstIndex, int length) {
|
public void readBytes(byte[] dst, int dstIndex, int length) {
|
||||||
checkReadableBytes(length);
|
checkReadableBytes(length);
|
||||||
buffer.readBytes(dst, dstIndex, length);
|
buf().readBytes(dst, dstIndex, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void readBytes(byte[] dst) {
|
public void readBytes(byte[] dst) {
|
||||||
checkReadableBytes(dst.length);
|
checkReadableBytes(dst.length);
|
||||||
buffer.readBytes(dst);
|
buf().readBytes(dst);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void readBytes(ByteBuffer dst) {
|
public void readBytes(ByteBuffer dst) {
|
||||||
@ -332,7 +329,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public void readBytes(ChannelBuffer dst, int dstIndex, int length) {
|
public void readBytes(ChannelBuffer dst, int dstIndex, int length) {
|
||||||
checkReadableBytes(length);
|
checkReadableBytes(length);
|
||||||
buffer.readBytes(dst, dstIndex, length);
|
buf().readBytes(dst, dstIndex, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void readBytes(ChannelBuffer dst, int length) {
|
public void readBytes(ChannelBuffer dst, int length) {
|
||||||
@ -345,11 +342,11 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public ChannelBuffer readBytes(ChannelBufferIndexFinder endIndexFinder) {
|
public ChannelBuffer readBytes(ChannelBufferIndexFinder endIndexFinder) {
|
||||||
int endIndex = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), endIndexFinder);
|
int endIndex = buf().indexOf(buf().readerIndex(), buf().writerIndex(), endIndexFinder);
|
||||||
if (endIndex < 0) {
|
if (endIndex < 0) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
return buffer.readBytes(endIndex - buffer.readerIndex());
|
return buf().readBytes(endIndex - buf().readerIndex());
|
||||||
}
|
}
|
||||||
|
|
||||||
public int readBytes(GatheringByteChannel out, int length)
|
public int readBytes(GatheringByteChannel out, int length)
|
||||||
@ -359,22 +356,22 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public ChannelBuffer readBytes(int length) {
|
public ChannelBuffer readBytes(int length) {
|
||||||
checkReadableBytes(length);
|
checkReadableBytes(length);
|
||||||
return buffer.readBytes(length);
|
return buf().readBytes(length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public ChannelBuffer readSlice(
|
public ChannelBuffer readSlice(
|
||||||
ChannelBufferIndexFinder endIndexFinder) {
|
ChannelBufferIndexFinder endIndexFinder) {
|
||||||
int endIndex = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), endIndexFinder);
|
int endIndex = buf().indexOf(buf().readerIndex(), buf().writerIndex(), endIndexFinder);
|
||||||
if (endIndex < 0) {
|
if (endIndex < 0) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
return buffer.readSlice(endIndex - buffer.readerIndex());
|
return buf().readSlice(endIndex - buf().readerIndex());
|
||||||
}
|
}
|
||||||
|
|
||||||
public ChannelBuffer readSlice(int length) {
|
public ChannelBuffer readSlice(int length) {
|
||||||
checkReadableBytes(length);
|
checkReadableBytes(length);
|
||||||
return buffer.readSlice(length);
|
return buf().readSlice(length);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void readBytes(OutputStream out, int length) throws IOException {
|
public void readBytes(OutputStream out, int length) throws IOException {
|
||||||
@ -382,65 +379,65 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public int readerIndex() {
|
public int readerIndex() {
|
||||||
return buffer.readerIndex();
|
return buf().readerIndex();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void readerIndex(int readerIndex) {
|
public void readerIndex(int readerIndex) {
|
||||||
buffer.readerIndex(readerIndex);
|
buf().readerIndex(readerIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int readInt() {
|
public int readInt() {
|
||||||
checkReadableBytes(4);
|
checkReadableBytes(4);
|
||||||
return buffer.readInt();
|
return buf().readInt();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long readUnsignedInt() {
|
public long readUnsignedInt() {
|
||||||
checkReadableBytes(4);
|
checkReadableBytes(4);
|
||||||
return buffer.readUnsignedInt();
|
return buf().readUnsignedInt();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long readLong() {
|
public long readLong() {
|
||||||
checkReadableBytes(8);
|
checkReadableBytes(8);
|
||||||
return buffer.readLong();
|
return buf().readLong();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int readMedium() {
|
public int readMedium() {
|
||||||
checkReadableBytes(3);
|
checkReadableBytes(3);
|
||||||
return buffer.readMedium();
|
return buf().readMedium();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int readUnsignedMedium() {
|
public int readUnsignedMedium() {
|
||||||
checkReadableBytes(3);
|
checkReadableBytes(3);
|
||||||
return buffer.readUnsignedMedium();
|
return buf().readUnsignedMedium();
|
||||||
}
|
}
|
||||||
|
|
||||||
public short readShort() {
|
public short readShort() {
|
||||||
checkReadableBytes(2);
|
checkReadableBytes(2);
|
||||||
return buffer.readShort();
|
return buf().readShort();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int readUnsignedShort() {
|
public int readUnsignedShort() {
|
||||||
checkReadableBytes(2);
|
checkReadableBytes(2);
|
||||||
return buffer.readUnsignedShort();
|
return buf().readUnsignedShort();
|
||||||
}
|
}
|
||||||
|
|
||||||
public char readChar() {
|
public char readChar() {
|
||||||
checkReadableBytes(2);
|
checkReadableBytes(2);
|
||||||
return buffer.readChar();
|
return buf().readChar();
|
||||||
}
|
}
|
||||||
|
|
||||||
public float readFloat() {
|
public float readFloat() {
|
||||||
checkReadableBytes(4);
|
checkReadableBytes(4);
|
||||||
return buffer.readFloat();
|
return buf().readFloat();
|
||||||
}
|
}
|
||||||
|
|
||||||
public double readDouble() {
|
public double readDouble() {
|
||||||
checkReadableBytes(8);
|
checkReadableBytes(8);
|
||||||
return buffer.readDouble();
|
return buf().readDouble();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void resetReaderIndex() {
|
public void resetReaderIndex() {
|
||||||
buffer.resetReaderIndex();
|
buf().resetReaderIndex();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void resetWriterIndex() {
|
public void resetWriterIndex() {
|
||||||
@ -523,18 +520,18 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public int skipBytes(ChannelBufferIndexFinder firstIndexFinder) {
|
public int skipBytes(ChannelBufferIndexFinder firstIndexFinder) {
|
||||||
int oldReaderIndex = buffer.readerIndex();
|
int oldReaderIndex = buf().readerIndex();
|
||||||
int newReaderIndex = buffer.indexOf(oldReaderIndex, buffer.writerIndex(), firstIndexFinder);
|
int newReaderIndex = buf().indexOf(oldReaderIndex, buf().writerIndex(), firstIndexFinder);
|
||||||
if (newReaderIndex < 0) {
|
if (newReaderIndex < 0) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
buffer.readerIndex(newReaderIndex);
|
buf().readerIndex(newReaderIndex);
|
||||||
return newReaderIndex - oldReaderIndex;
|
return newReaderIndex - oldReaderIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void skipBytes(int length) {
|
public void skipBytes(int length) {
|
||||||
checkReadableBytes(length);
|
checkReadableBytes(length);
|
||||||
buffer.skipBytes(length);
|
buf().skipBytes(length);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ChannelBuffer slice() {
|
public ChannelBuffer slice() {
|
||||||
@ -543,7 +540,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public ChannelBuffer slice(int index, int length) {
|
public ChannelBuffer slice(int index, int length) {
|
||||||
checkIndex(index, length);
|
checkIndex(index, length);
|
||||||
return buffer.slice(index, length);
|
return buf().slice(index, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteBuffer toByteBuffer() {
|
public ByteBuffer toByteBuffer() {
|
||||||
@ -552,7 +549,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public ByteBuffer toByteBuffer(int index, int length) {
|
public ByteBuffer toByteBuffer(int index, int length) {
|
||||||
checkIndex(index, length);
|
checkIndex(index, length);
|
||||||
return buffer.toByteBuffer(index, length);
|
return buf().toByteBuffer(index, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteBuffer[] toByteBuffers() {
|
public ByteBuffer[] toByteBuffers() {
|
||||||
@ -561,12 +558,12 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
|
|
||||||
public ByteBuffer[] toByteBuffers(int index, int length) {
|
public ByteBuffer[] toByteBuffers(int index, int length) {
|
||||||
checkIndex(index, length);
|
checkIndex(index, length);
|
||||||
return buffer.toByteBuffers(index, length);
|
return buf().toByteBuffers(index, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toString(int index, int length, Charset charset) {
|
public String toString(int index, int length, Charset charset) {
|
||||||
checkIndex(index, length);
|
checkIndex(index, length);
|
||||||
return buffer.toString(index, length, charset);
|
return buf().toString(index, length, charset);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toString(Charset charsetName) {
|
public String toString(Charset charsetName) {
|
||||||
@ -576,7 +573,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
@Deprecated
|
@Deprecated
|
||||||
public String toString(int index, int length, String charsetName) {
|
public String toString(int index, int length, String charsetName) {
|
||||||
checkIndex(index, length);
|
checkIndex(index, length);
|
||||||
return buffer.toString(index, length, charsetName);
|
return buf().toString(index, length, charsetName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
@ -584,7 +581,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
int index, int length, String charsetName,
|
int index, int length, String charsetName,
|
||||||
ChannelBufferIndexFinder terminatorFinder) {
|
ChannelBufferIndexFinder terminatorFinder) {
|
||||||
checkIndex(index, length);
|
checkIndex(index, length);
|
||||||
return buffer.toString(index, length, charsetName, terminatorFinder);
|
return buf().toString(index, length, charsetName, terminatorFinder);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
@ -671,7 +668,7 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public int writerIndex() {
|
public int writerIndex() {
|
||||||
return buffer.writerIndex();
|
return buf().writerIndex();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void writerIndex(int writerIndex) {
|
public void writerIndex(int writerIndex) {
|
||||||
@ -695,19 +692,19 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void checkIndex(int index) {
|
private void checkIndex(int index) {
|
||||||
if (index > buffer.writerIndex()) {
|
if (index > buf().writerIndex()) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkIndex(int index, int length) {
|
private void checkIndex(int index, int length) {
|
||||||
if (index + length > buffer.writerIndex()) {
|
if (index + length > buf().writerIndex()) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkReadableBytes(int readableBytes) {
|
private void checkReadableBytes(int readableBytes) {
|
||||||
if (buffer.readableBytes() < readableBytes) {
|
if (buf().readableBytes() < readableBytes) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user