Remove special handling of Object[] in codec framework (a.k.a unfolding)

- Fixes #1229
- Primarily written by @normanmaurer and revised by @trustin

This commit removes the notion of unfolding from the codec framework
completely.  Unfolding was introduced in Netty 3.x to work around the
shortcoming of the codec framework where encode() and decode() did not
allow generating multiple messages.

Such a shortcoming can be fixed by changing the signature of encode()
and decode() instead of introducing an obscure workaround like
unfolding.  Therefore, we changed the signature of them in 4.0.

The change is simple, but backward-incompatible.  encode() and decode()
do not return anything.  Instead, the codec framework will pass a
MessageBuf<Object> so encode() and decode() can add the generated
messages into the MessageBuf.
This commit is contained in:
Norman Maurer 2013-04-03 11:32:33 +02:00 committed by Trustin Lee
parent cd0b5ec2db
commit af4b71a00e
44 changed files with 558 additions and 368 deletions

View File

@ -207,29 +207,6 @@ public abstract class AbstractMessageBuf<T> extends AbstractQueue<T> implements
return super.element();
}
@Override
@SuppressWarnings("unchecked")
public boolean unfoldAndAdd(Object o) {
if (o == null) {
return false;
}
if (o instanceof Object[]) {
Object[] a = (Object[]) o;
int i;
for (i = 0; i < a.length; i ++) {
Object m = a[i];
if (m == null) {
break;
}
add((T) m);
}
return i != 0;
}
return add((T) o);
}
@Override
public int drainTo(Collection<? super T> c) {
ensureAccessible();

View File

@ -194,11 +194,6 @@ public abstract class FilteredMessageBuf implements MessageBuf<Object> {
buf.clear();
}
@Override
public boolean unfoldAndAdd(Object o) {
return buf.unfoldAndAdd(o);
}
@Override
public int refCnt() {
return buf.refCnt();

View File

@ -25,17 +25,6 @@ import java.util.Queue;
*/
public interface MessageBuf<T> extends Buf, Queue<T> {
/**
* Unfold the specified object if necessary, and then add the unfolded objects (or the specified object if
* unfonding was not necessary) to this buffer. If the specified object is an object array ({@code Object[]}),
* this method adds the elements of the array to this buffer until {@code null} is encountered. If the specified
* object is {@code null}, nothing is added to this buffer. Otherwise, the specified object is added to this
* buffer as-is.
*
* @return {@code true} if one or more messages were added to this buffer. {@code false} otherwise.
*/
boolean unfoldAndAdd(Object o);
/**
* Drain the content of te {@link MessageBuf} to the given {@link Collection}.
*

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.FilteredMessageBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@ -119,6 +120,7 @@ public final class HttpClientCodec
}
private final class Encoder extends HttpRequestEncoder {
@Override
protected void encode(
ChannelHandlerContext ctx, HttpObject msg, ByteBuf out) throws Exception {
@ -139,28 +141,32 @@ public final class HttpClientCodec
}
private final class Decoder extends HttpResponseDecoder {
Decoder(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
super(maxInitialLineLength, maxHeaderSize, maxChunkSize);
}
@Override
protected Object decode(
ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
protected void decode(
ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf<Object> out) throws Exception {
if (done) {
int readable = actualReadableBytes();
if (readable == 0) {
// if non is readable just return null
// https://github.com/netty/netty/issues/1159
return null;
return;
}
return buffer.readBytes(readable);
out.add(buffer.readBytes(readable));
} else {
Object msg = super.decode(ctx, buffer);
if (failOnMissingResponse) {
decrement(msg);
out = new FilteredMessageBuf(out) {
@Override
protected Object filter(Object msg) {
decrement(msg);
return msg;
}
};
}
return msg;
super.decode(ctx, buffer, out);
}
}
@ -172,11 +178,6 @@ public final class HttpClientCodec
// check if it's an Header and its transfer encoding is not chunked.
if (msg instanceof LastHttpContent) {
requestResponseCounter.decrementAndGet();
} else if (msg instanceof Object[]) {
Object[] objects = (Object[]) msg;
for (Object obj: objects) {
decrement(obj);
}
}
}

View File

@ -18,6 +18,7 @@ package io.netty.handler.codec.http;
import io.netty.buffer.BufUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedByteChannel;
@ -50,14 +51,15 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
private boolean continueResponse;
@Override
protected Object decode(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
protected void decode(ChannelHandlerContext ctx, HttpObject msg, MessageBuf<Object> out) throws Exception {
if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().code() == 100) {
// 100-continue response must be passed through.
BufUtil.retain(msg);
if (!(msg instanceof LastHttpContent)) {
continueResponse = true;
}
return msg;
// 100-continue response must be passed through.
out.add(BufUtil.retain(msg));
return;
}
if (continueResponse) {
@ -65,8 +67,8 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
continueResponse = false;
}
// 100-continue response must be passed through.
BufUtil.retain(msg);
return msg;
out.add(BufUtil.retain(msg));
return;
}
if (msg instanceof HttpMessage) {
@ -112,27 +114,34 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
HttpHeaders.Names.CONTENT_LENGTH,
Integer.toString(((ByteBufHolder) decoded[1]).data().readableBytes()));
}
return decoded;
for (Object obj: decoded) {
out.add(obj);
}
return;
}
if (c instanceof LastHttpContent) {
decodeStarted = false;
}
return new Object[] { message, c.retain() };
out.add(message);
out.add(c.retain());
return;
}
if (decoder != null) {
return decodeContent(null, c);
Object[] decoded = decodeContent(null, c);
for (Object obj: decoded) {
out.add(obj);
}
} else {
if (c instanceof LastHttpContent) {
decodeStarted = false;
}
return c.retain();
out.add(c.retain());
}
}
return null;
}
private Object[] decodeContent(HttpMessage header, HttpContent c) {

View File

@ -18,6 +18,7 @@ package io.netty.handler.codec.http;
import io.netty.buffer.BufUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedByteChannel;
@ -55,28 +56,29 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
private HttpMessage message;
private boolean encodeStarted;
private boolean continueResponse;
@Override
protected Object decode(ChannelHandlerContext ctx, HttpMessage msg)
protected void decode(ChannelHandlerContext ctx, HttpMessage msg, MessageBuf<Object> out)
throws Exception {
String acceptedEncoding = msg.headers().get(HttpHeaders.Names.ACCEPT_ENCODING);
if (acceptedEncoding == null) {
acceptedEncoding = HttpHeaders.Values.IDENTITY;
}
acceptEncodingQueue.add(acceptedEncoding);
BufUtil.retain(msg);
return msg;
out.add(BufUtil.retain(msg));
}
@Override
protected Object encode(ChannelHandlerContext ctx, HttpObject msg)
protected void encode(ChannelHandlerContext ctx, HttpObject msg, MessageBuf<Object> out)
throws Exception {
if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().code() == 100) {
// 100-continue response must be passed through.
BufUtil.retain(msg);
if (!(msg instanceof LastHttpContent)) {
continueResponse = true;
}
return msg;
// 100-continue response must be passed through.
out.add(BufUtil.retain(msg));
return;
}
if (continueResponse) {
@ -84,8 +86,8 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
continueResponse = false;
}
// 100-continue response must be passed through.
BufUtil.retain(msg);
return msg;
out.add(BufUtil.retain(msg));
return;
}
// handle the case of single complete message without content
@ -97,7 +99,8 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
throw new IllegalStateException("cannot send more responses than requests");
}
return ((FullHttpMessage) msg).retain();
out.add(BufUtil.retain(msg));
return;
}
if (msg instanceof HttpMessage) {
@ -115,7 +118,8 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
message = new DefaultHttpResponse(res.getProtocolVersion(), res.getStatus());
message.headers().set(res.headers());
} else {
return msg;
out.add(msg);
return;
}
} else {
message = (HttpMessage) msg;
@ -144,9 +148,13 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
if (result == null) {
if (c instanceof LastHttpContent) {
encodeStarted = false;
return new Object[] { message, new DefaultLastHttpContent(c.data().retain()) };
out.add(message);
out.add(new DefaultLastHttpContent(c.data().retain()));
return;
} else {
return new Object[] { message, new DefaultHttpContent(c.data().retain()) };
out.add(message);
out.add(new DefaultHttpContent(c.data().retain()));
return;
}
}
@ -158,7 +166,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
HttpHeaders.Names.CONTENT_ENCODING,
result.targetContentEncoding());
Object[] encoded = encodeContent(message, c);
HttpObject[] encoded = encodeContent(message, c);
if (!HttpHeaders.isTransferEncodingChunked(message) && encoded.length == 3) {
if (headers.contains(HttpHeaders.Names.CONTENT_LENGTH)) {
@ -170,23 +178,29 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
Long.toString(length));
}
}
return encoded;
for (HttpObject obj: encoded) {
out.add(obj);
}
return;
}
if (encoder != null) {
return encodeContent(null, c);
HttpObject[] encoded = encodeContent(null, c);
for (HttpObject obj: encoded) {
out.add(obj);
}
return;
}
if (c instanceof LastHttpContent) {
encodeStarted = false;
}
return c.retain();
out.add(c.retain());
}
return null;
}
private Object[] encodeContent(HttpMessage header, HttpContent c) {
private HttpObject[] encodeContent(HttpMessage header, HttpContent c) {
ByteBuf newContent = Unpooled.buffer();
ByteBuf content = c.data();
encode(content, newContent);
@ -199,23 +213,24 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
// the last product on closure,
if (lastProduct.isReadable()) {
if (header == null) {
return new Object[] { new DefaultHttpContent(newContent), new DefaultLastHttpContent(lastProduct)};
return new HttpObject[] { new DefaultHttpContent(newContent),
new DefaultLastHttpContent(lastProduct)};
} else {
return new Object[] { header, new DefaultHttpContent(newContent),
return new HttpObject[] { header, new DefaultHttpContent(newContent),
new DefaultLastHttpContent(lastProduct)};
}
} else {
if (header == null) {
return new Object[] { new DefaultLastHttpContent(newContent) };
return new HttpObject[] { new DefaultLastHttpContent(newContent) };
} else {
return new Object[] { header, new DefaultLastHttpContent(newContent) };
return new HttpObject[] { header, new DefaultLastHttpContent(newContent) };
}
}
}
if (header == null) {
return new Object[] { new DefaultHttpContent(newContent) };
return new HttpObject[] { new DefaultHttpContent(newContent) };
} else {
return new Object[] { header, new DefaultHttpContent(newContent) };
return new HttpObject[] { header, new DefaultHttpContent(newContent) };
}
}

View File

@ -18,6 +18,7 @@ package io.netty.handler.codec.http;
import io.netty.buffer.BufUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@ -106,7 +107,7 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
}
@Override
protected Object decode(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
protected void decode(ChannelHandlerContext ctx, HttpObject msg, MessageBuf<Object> out) throws Exception {
FullHttpMessage currentMessage = this.currentMessage;
if (msg instanceof HttpMessage) {
@ -126,8 +127,8 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
if (!m.getDecoderResult().isSuccess()) {
removeTransferEncodingChunked(m);
this.currentMessage = null;
BufUtil.retain(m);
return m;
out.add(BufUtil.retain(m));
return;
}
if (msg instanceof HttpRequest) {
HttpRequest header = (HttpRequest) msg;
@ -146,8 +147,6 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
// A streamed message - initialize the cumulative buffer, and wait for incoming chunks.
removeTransferEncodingChunked(currentMessage);
return null;
} else if (msg instanceof HttpContent) {
assert currentMessage != null;
@ -196,9 +195,7 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
String.valueOf(content.readableBytes()));
// All done
return currentMessage;
} else {
return null;
out.add(currentMessage);
}
} else {
throw new Error();

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
@ -167,7 +168,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf<Object> out) throws Exception {
switch (state()) {
case SKIP_CONTROL_CHARS: {
try {
@ -182,14 +183,15 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
if (initialLine.length < 3) {
// Invalid initial line - ignore.
checkpoint(State.SKIP_CONTROL_CHARS);
return null;
return;
}
message = createMessage(initialLine);
checkpoint(State.READ_HEADER);
} catch (Exception e) {
return invalidMessage(e);
out.add(invalidMessage(e));
return;
}
case READ_HEADER: try {
State nextState = readHeaders(buffer);
@ -199,16 +201,25 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
throw new IllegalArgumentException("Chunked messages not supported");
}
// Chunked encoding - generate HttpMessage first. HttpChunks will follow.
return message;
out.add(message);
return;
}
if (nextState == State.SKIP_CONTROL_CHARS) {
// No content is expected.
return reset();
HttpObject[] parts = reset();
for (HttpObject object: parts) {
out.add(object);
}
return;
}
long contentLength = HttpHeaders.getContentLength(message, -1);
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
content = Unpooled.EMPTY_BUFFER;
return reset();
HttpObject[] parts = reset();
for (HttpObject object: parts) {
out.add(object);
}
return;
}
switch (nextState) {
@ -219,30 +230,35 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
// chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT_AS_CHUNKS
// state reads data chunk by chunk.
chunkSize = HttpHeaders.getContentLength(message, -1);
return message;
out.add(message);
return;
}
break;
case READ_VARIABLE_LENGTH_CONTENT:
if (buffer.readableBytes() > maxChunkSize || HttpHeaders.is100ContinueExpected(message)) {
// Generate FullHttpMessage first. HttpChunks will follow.
checkpoint(State.READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS);
return message;
out.add(message);
return;
}
break;
default:
throw new IllegalStateException("Unexpected state: " + nextState);
}
// We return null here, this forces decode to be called again where we will decode the content
return null;
// We return here, this forces decode to be called again where we will decode the content
return;
} catch (Exception e) {
return invalidMessage(e);
out.add(invalidMessage(e));
return;
}
case READ_VARIABLE_LENGTH_CONTENT: {
int toRead = actualReadableBytes();
if (toRead > maxChunkSize) {
toRead = maxChunkSize;
}
return new Object[] { message, new DefaultHttpContent(buffer.readBytes(toRead))};
out.add(message);
out.add(new DefaultHttpContent(buffer.readBytes(toRead)));
return;
}
case READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS: {
// Keep reading data as a chunk until the end of connection is reached.
@ -253,12 +269,18 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
ByteBuf content = buffer.readBytes(toRead);
if (!buffer.isReadable()) {
reset();
return new DefaultLastHttpContent(content);
out.add(new DefaultLastHttpContent(content));
return;
}
return new DefaultHttpContent(content);
out.add(new DefaultHttpContent(content));
return;
}
case READ_FIXED_LENGTH_CONTENT: {
return readFixedLengthContent(buffer);
HttpObject[] parts = readFixedLengthContent(buffer);
for (HttpObject part: parts) {
out.add(part);
}
return;
}
case READ_FIXED_LENGTH_CONTENT_AS_CHUNKS: {
long chunkSize = this.chunkSize;
@ -271,7 +293,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
//
// See https://github.com/netty/netty/issues/433
if (readLimit == 0) {
return null;
return;
}
int toRead = readLimit;
@ -292,9 +314,11 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
if (chunkSize == 0) {
// Read all content.
reset();
return new DefaultLastHttpContent(content);
out.add(new DefaultLastHttpContent(content));
return;
}
return new DefaultHttpContent(content);
out.add(new DefaultHttpContent(content));
return;
}
/**
* everything else after this point takes care of reading chunked content. basically, read chunk size,
@ -306,7 +330,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
this.chunkSize = chunkSize;
if (chunkSize == 0) {
checkpoint(State.READ_CHUNK_FOOTER);
return null;
return;
} else if (chunkSize > maxChunkSize) {
// A chunk is too large. Split them into multiple chunks again.
checkpoint(State.READ_CHUNKED_CONTENT_AS_CHUNKS);
@ -314,13 +338,15 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
checkpoint(State.READ_CHUNKED_CONTENT);
}
} catch (Exception e) {
return invalidChunk(e);
out.add(invalidChunk(e));
return;
}
case READ_CHUNKED_CONTENT: {
assert chunkSize <= Integer.MAX_VALUE;
HttpContent chunk = new DefaultHttpContent(buffer.readBytes((int) chunkSize));
checkpoint(State.READ_CHUNK_DELIMITER);
return chunk;
out.add(chunk);
return;
}
case READ_CHUNKED_CONTENT_AS_CHUNKS: {
assert chunkSize <= Integer.MAX_VALUE;
@ -334,7 +360,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
//
// See https://github.com/netty/netty/issues/433
if (readLimit == 0) {
return null;
return;
}
int toRead = chunkSize;
@ -357,7 +383,8 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
checkpoint(State.READ_CHUNK_DELIMITER);
}
return chunk;
out.add(chunk);
return;
}
case READ_CHUNK_DELIMITER: {
for (;;) {
@ -365,11 +392,11 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
if (next == HttpConstants.CR) {
if (buffer.readByte() == HttpConstants.LF) {
checkpoint(State.READ_CHUNK_SIZE);
return null;
return;
}
} else if (next == HttpConstants.LF) {
checkpoint(State.READ_CHUNK_SIZE);
return null;
return;
} else {
checkpoint();
}
@ -379,19 +406,25 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
LastHttpContent trailer = readTrailingHeaders(buffer);
if (maxChunkSize == 0) {
// Chunked encoding disabled.
return reset();
HttpObject[] parts = reset();
for (HttpObject object: parts) {
out.add(object);
}
return;
} else {
reset();
// The last chunk, which is empty
return trailer;
out.add(trailer);
return;
}
} catch (Exception e) {
return invalidChunk(e);
out.add(invalidChunk(e));
return;
}
case BAD_MESSAGE: {
// Keep discarding until disconnection.
buffer.skipBytes(actualReadableBytes());
return null;
return;
}
default: {
throw new Error("Shouldn't reach here.");
@ -425,7 +458,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
return false;
}
private Object reset() {
private HttpObject[] reset() {
HttpMessage message = this.message;
ByteBuf content = this.content;
LastHttpContent httpContent;
@ -436,7 +469,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
httpContent = new DefaultLastHttpContent(content);
}
Object[] messages = { message, httpContent };
HttpObject[] messages = { message, httpContent };
this.content = null;
this.message = null;
@ -473,7 +506,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
}
}
private Object readFixedLengthContent(ByteBuf buffer) {
private HttpObject[] readFixedLengthContent(ByteBuf buffer) {
//we have a content-length so we just read the correct number of bytes
long length = HttpHeaders.getContentLength(message, -1);
assert length <= Integer.MAX_VALUE;
@ -483,7 +516,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
}
contentRead += toRead;
if (length < contentRead) {
return new Object[] {message, new DefaultHttpContent(buffer.readBytes(toRead))};
return new HttpObject[] {message, new DefaultHttpContent(buffer.readBytes(toRead))};
}
if (content == null) {
content = buffer.readBytes((int) length);

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException;
@ -49,21 +50,21 @@ public class WebSocket00FrameDecoder extends ReplayingDecoder<Void> {
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
// Discard all data received if closing handshake was received before.
if (receivedClosingHandshake) {
in.skipBytes(actualReadableBytes());
return null;
return;
}
// Decode a frame otherwise.
byte type = in.readByte();
if ((type & 0x80) == 0x80) {
// If the MSB on type is set, decode the frame length
return decodeBinaryFrame(ctx, type, in);
out.add(decodeBinaryFrame(ctx, type, in));
} else {
// Decode a 0xff terminated UTF-8 string
return decodeTextFrame(ctx, in);
out.add(decodeTextFrame(ctx, in));
}
}

View File

@ -54,6 +54,7 @@
package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.CorruptedFrameException;
@ -117,12 +118,12 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
// Discard all data received if closing handshake was received before.
if (receivedClosingHandshake) {
in.skipBytes(actualReadableBytes());
return null;
return;
}
switch (state()) {
@ -148,31 +149,31 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
if (frameRsv != 0 && !allowExtensions) {
protocolViolation(ctx, "RSV != 0 and no extension negotiated, RSV:" + frameRsv);
return null;
return;
}
if (maskedPayload && !frameMasked) {
protocolViolation(ctx, "unmasked client to server frame");
return null;
return;
}
if (frameOpcode > 7) { // control frame (have MSB in opcode set)
// control frames MUST NOT be fragmented
if (!frameFinalFlag) {
protocolViolation(ctx, "fragmented control frame");
return null;
return;
}
// control frames MUST have payload 125 octets or less
if (framePayloadLen1 > 125) {
protocolViolation(ctx, "control frame with payload length > 125 octets");
return null;
return;
}
// check for reserved control frame opcodes
if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING || frameOpcode == OPCODE_PONG)) {
protocolViolation(ctx, "control frame using reserved opcode " + frameOpcode);
return null;
return;
}
// close frame : if there is a body, the first two bytes of the
@ -180,25 +181,25 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// order) representing a getStatus code
if (frameOpcode == 8 && framePayloadLen1 == 1) {
protocolViolation(ctx, "received close control frame with payload len 1");
return null;
return;
}
} else { // data frame
// check for reserved data frame opcodes
if (!(frameOpcode == OPCODE_CONT || frameOpcode == OPCODE_TEXT || frameOpcode == OPCODE_BINARY)) {
protocolViolation(ctx, "data frame using reserved opcode " + frameOpcode);
return null;
return;
}
// check opcode vs message fragmentation state 1/2
if (fragmentedFramesCount == 0 && frameOpcode == OPCODE_CONT) {
protocolViolation(ctx, "received continuation data frame outside fragmented message");
return null;
return;
}
// check opcode vs message fragmentation state 2/2
if (fragmentedFramesCount != 0 && frameOpcode != OPCODE_CONT && frameOpcode != OPCODE_PING) {
protocolViolation(ctx, "received non-continuation data frame while inside fragmented message");
return null;
return;
}
}
@ -207,7 +208,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
framePayloadLength = in.readUnsignedShort();
if (framePayloadLength < 126) {
protocolViolation(ctx, "invalid data frame length (not using minimal length encoding)");
return null;
return;
}
} else if (framePayloadLen1 == 127) {
framePayloadLength = in.readLong();
@ -216,7 +217,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
if (framePayloadLength < 65536) {
protocolViolation(ctx, "invalid data frame length (not using minimal length encoding)");
return null;
return;
}
} else {
framePayloadLength = framePayloadLen1;
@ -224,7 +225,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
if (framePayloadLength > maxFramePayloadLength) {
protocolViolation(ctx, "Max frame length of " + maxFramePayloadLength + " has been exceeded.");
return null;
return;
}
if (logger.isDebugEnabled()) {
@ -262,7 +263,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
framePayloadBytesRead += rbytes;
// Return null to wait for more bytes to arrive
return null;
return;
} else if (willHaveReadByteCount > framePayloadLength) {
// We have more than what we need so read up to the end of frame
// Leave the remainder in the buffer for next frame
@ -291,15 +292,18 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// Processing ping/pong/close frames because they cannot be
// fragmented
if (frameOpcode == OPCODE_PING) {
return new PingWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
out.add(new PingWebSocketFrame(frameFinalFlag, frameRsv, framePayload));
return;
}
if (frameOpcode == OPCODE_PONG) {
return new PongWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
out.add(new PongWebSocketFrame(frameFinalFlag, frameRsv, framePayload));
return;
}
if (frameOpcode == OPCODE_CLOSE) {
checkCloseFrameBody(ctx, framePayload);
receivedClosingHandshake = true;
return new CloseWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
out.add(new CloseWebSocketFrame(frameFinalFlag, frameRsv, framePayload));
return;
}
// Processing for possible fragmented messages for text and binary
@ -345,11 +349,14 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// Return the frame
if (frameOpcode == OPCODE_TEXT) {
return new TextWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
out.add(new TextWebSocketFrame(frameFinalFlag, frameRsv, framePayload));
return;
} else if (frameOpcode == OPCODE_BINARY) {
return new BinaryWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
out.add(new BinaryWebSocketFrame(frameFinalFlag, frameRsv, framePayload));
return;
} else if (frameOpcode == OPCODE_CONT) {
return new ContinuationWebSocketFrame(frameFinalFlag, frameRsv, framePayload, aggregatedText);
out.add(new ContinuationWebSocketFrame(frameFinalFlag, frameRsv, framePayload, aggregatedText));
return;
} else {
throw new UnsupportedOperationException("Cannot decode web socket frame with opcode: "
+ frameOpcode);
@ -358,7 +365,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// If we don't keep reading Netty will throw an exception saying
// we can't return null if no bytes read and state not changed.
in.readByte();
return null;
return;
default:
throw new Error("Shouldn't reach here.");
}

View File

@ -17,6 +17,7 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
@ -45,10 +46,11 @@ public class WebSocketFrameAggregator extends MessageToMessageDecoder<WebSocketF
}
@Override
protected Object decode(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, MessageBuf<Object> out) throws Exception {
if (currentFrame == null) {
if (msg.isFinalFragment()) {
return msg.retain();
out.add(msg.retain());
return;
}
ByteBuf buf = ctx.alloc().compositeBuffer().addComponent(msg.data().retain());
buf.writerIndex(buf.writerIndex() + msg.data().readableBytes());
@ -61,7 +63,7 @@ public class WebSocketFrameAggregator extends MessageToMessageDecoder<WebSocketF
throw new IllegalStateException(
"WebSocket frame was not of type TextWebSocketFrame or BinaryWebSocketFrame");
}
return null;
return;
}
if (msg instanceof ContinuationWebSocketFrame) {
CompositeByteBuf content = (CompositeByteBuf) currentFrame.data();
@ -76,13 +78,14 @@ public class WebSocketFrameAggregator extends MessageToMessageDecoder<WebSocketF
if (msg.isFinalFragment()) {
WebSocketFrame frame = this.currentFrame;
this.currentFrame = null;
return frame;
out.add(frame);
return;
} else {
return null;
return;
}
}
// It is possible to receive CLOSE/PING/PONG frames during fragmented frames so just pass them to the next
// handler in the chain
return msg.retain();
out.add(msg.retain());
}
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.spdy;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
@ -93,16 +94,16 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
}
@Override
public Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
public void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
try {
return decode(ctx, in);
decode(ctx, in, out);
} finally {
headerBlockDecompressor.end();
}
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf<Object> out) throws Exception {
switch(state) {
case READ_COMMON_HEADER:
state = readCommonHeader(buffer);
@ -121,19 +122,20 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
if (streamID == 0) {
state = State.FRAME_ERROR;
fireProtocolException(ctx, "Received invalid data frame");
return null;
return;
}
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamID);
spdyDataFrame.setLast((flags & SPDY_DATA_FLAG_FIN) != 0);
state = State.READ_COMMON_HEADER;
return spdyDataFrame;
out.add(spdyDataFrame);
return;
}
// There are no length 0 control frames
state = State.READ_COMMON_HEADER;
}
return null;
return;
case READ_CONTROL_FRAME:
try {
@ -141,18 +143,19 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
if (frame != null) {
state = State.READ_COMMON_HEADER;
}
return frame;
out.add(frame);
return;
} catch (IllegalArgumentException e) {
state = State.FRAME_ERROR;
fireInvalidControlFrameException(ctx);
}
return null;
return;
case READ_SETTINGS_FRAME:
if (spdySettingsFrame == null) {
// Validate frame length against number of entries
if (buffer.readableBytes() < 4) {
return null;
return;
}
int numEntries = getUnsignedInt(buffer, buffer.readerIndex());
buffer.skipBytes(4);
@ -162,7 +165,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
if ((length & 0x07) != 0 || length >> 3 != numEntries) {
state = State.FRAME_ERROR;
fireInvalidControlFrameException(ctx);
return null;
return;
}
spdySettingsFrame = new DefaultSpdySettingsFrame();
@ -196,7 +199,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
state = State.FRAME_ERROR;
spdySettingsFrame = null;
fireInvalidControlFrameException(ctx);
return null;
return;
}
if (!spdySettingsFrame.isSet(ID)) {
@ -211,9 +214,10 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
state = State.READ_COMMON_HEADER;
Object frame = spdySettingsFrame;
spdySettingsFrame = null;
return frame;
out.add(frame);
return;
}
return null;
return;
case READ_HEADER_BLOCK_FRAME:
try {
@ -223,15 +227,16 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
state = State.READ_COMMON_HEADER;
Object frame = spdyHeaderBlock;
spdyHeaderBlock = null;
return frame;
out.add(frame);
return;
}
state = State.READ_HEADER_BLOCK;
}
return null;
return;
} catch (IllegalArgumentException e) {
state = State.FRAME_ERROR;
fireInvalidControlFrameException(ctx);
return null;
return;
}
case READ_HEADER_BLOCK:
@ -245,7 +250,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
spdyHeaderBlock = null;
decompressed = null;
ctx.fireExceptionCaught(e);
return null;
return;
}
if (spdyHeaderBlock != null && spdyHeaderBlock.isInvalid()) {
@ -255,22 +260,24 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
if (length == 0) {
state = State.READ_COMMON_HEADER;
}
return frame;
out.add(frame);
return;
}
if (length == 0) {
Object frame = spdyHeaderBlock;
spdyHeaderBlock = null;
state = State.READ_COMMON_HEADER;
return frame;
out.add(frame);
return;
}
return null;
return;
case READ_DATA_FRAME:
if (streamID == 0) {
state = State.FRAME_ERROR;
fireProtocolException(ctx, "Received invalid data frame");
return null;
return;
}
// Generate data frames that do not exceed maxChunkSize
@ -278,7 +285,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
// Wait until entire frame is readable
if (buffer.readableBytes() < dataLength) {
return null;
return;
}
ByteBuf data = ctx.alloc().buffer(dataLength);
@ -290,7 +297,8 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
spdyDataFrame.setLast((flags & SPDY_DATA_FLAG_FIN) != 0);
state = State.READ_COMMON_HEADER;
}
return spdyDataFrame;
out.add(spdyDataFrame);
return;
case DISCARD_FRAME:
int numBytes = Math.min(buffer.readableBytes(), length);
@ -299,11 +307,11 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
if (length == 0) {
state = State.READ_COMMON_HEADER;
}
return null;
return;
case FRAME_ERROR:
buffer.skipBytes(buffer.readableBytes());
return null;
return;
default:
throw new Error("Shouldn't reach here.");

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.spdy;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
@ -65,7 +66,8 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyDataOrControlFr
}
@Override
protected Object decode(ChannelHandlerContext ctx, SpdyDataOrControlFrame msg) throws Exception {
protected void decode(ChannelHandlerContext ctx, SpdyDataOrControlFrame msg, MessageBuf<Object> out)
throws Exception {
if (msg instanceof SpdySynStreamFrame) {
// HTTP requests/responses are mapped one-to-one to SPDY streams.
@ -106,7 +108,8 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyDataOrControlFr
if (spdySynStreamFrame.isLast()) {
HttpHeaders.setContentLength(httpResponseWithEntity, 0);
return httpResponseWithEntity;
out.add(httpResponseWithEntity);
return;
} else {
// Response body will follow in a series of Data Frames
messageMap.put(Integer.valueOf(streamID), httpResponseWithEntity);
@ -125,7 +128,8 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyDataOrControlFr
SpdyHttpHeaders.setStreamId(httpRequestWithEntity, streamID);
if (spdySynStreamFrame.isLast()) {
return httpRequestWithEntity;
out.add(httpRequestWithEntity);
return;
} else {
// Request body will follow in a series of Data Frames
messageMap.put(Integer.valueOf(streamID), httpRequestWithEntity);
@ -155,7 +159,8 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyDataOrControlFr
if (spdySynReplyFrame.isLast()) {
HttpHeaders.setContentLength(httpResponseWithEntity, 0);
return httpResponseWithEntity;
out.add(httpResponseWithEntity);
return;
} else {
// Response body will follow in a series of Data Frames
messageMap.put(Integer.valueOf(streamID), httpResponseWithEntity);
@ -177,7 +182,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyDataOrControlFr
// If message is not in map discard HEADERS frame.
// SpdySessionHandler should prevent this from happening.
if (httpMessage == null) {
return null;
return;
}
for (Map.Entry<String, String> e: spdyHeadersFrame.headers().entries()) {
@ -193,7 +198,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyDataOrControlFr
// If message is not in map discard Data Frame.
// SpdySessionHandler should prevent this from happening.
if (fullHttpMessage == null) {
return null;
return;
}
ByteBuf content = fullHttpMessage.data();
@ -210,7 +215,8 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyDataOrControlFr
if (spdyDataFrame.isLast()) {
HttpHeaders.setContentLength(fullHttpMessage, content.readableBytes());
messageMap.remove(streamID);
return fullHttpMessage;
out.add(fullHttpMessage);
return;
}
} else if (msg instanceof SpdyRstStreamFrame) {
@ -219,8 +225,6 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyDataOrControlFr
Integer streamID = Integer.valueOf(spdyRstStreamFrame.getStreamId());
messageMap.remove(streamID);
}
return null;
}
private static FullHttpRequest createHttpRequest(int spdyVersion, SpdyHeaderBlock requestFrame)

View File

@ -15,6 +15,7 @@
*/
package io.netty.handler.codec.spdy;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.UnsupportedMessageTypeException;
@ -27,7 +28,6 @@ import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -139,9 +139,7 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
}
@Override
protected Object encode(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
List<Object> out = new ArrayList<Object>();
protected void encode(ChannelHandlerContext ctx, HttpObject msg, MessageBuf<Object> out) throws Exception {
boolean valid = false;
@ -199,8 +197,6 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
if (!valid) {
throw new UnsupportedMessageTypeException(msg);
}
return out.toArray();
}
private SpdySynStreamFrame createSynStreamFrame(HttpMessage httpMessage)

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.spdy;
import io.netty.buffer.BufUtil;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.codec.http.HttpMessage;
@ -39,18 +40,17 @@ public class SpdyHttpResponseStreamIdHandler extends
}
@Override
protected Object encode(ChannelHandlerContext ctx, HttpMessage msg) throws Exception {
protected void encode(ChannelHandlerContext ctx, HttpMessage msg, MessageBuf<Object> out) throws Exception {
Integer id = ids.poll();
if (id != null && id.intValue() != NO_ID && !msg.headers().contains(SpdyHttpHeaders.Names.STREAM_ID)) {
SpdyHttpHeaders.setStreamId(msg, id);
}
BufUtil.retain(msg);
return msg;
out.add(BufUtil.retain(msg));
}
@Override
protected Object decode(ChannelHandlerContext ctx, Object msg) throws Exception {
protected void decode(ChannelHandlerContext ctx, Object msg, MessageBuf<Object> out) throws Exception {
if (msg instanceof HttpMessage) {
boolean contains = ((HttpMessage) msg).headers().contains(SpdyHttpHeaders.Names.STREAM_ID);
if (!contains) {
@ -62,7 +62,6 @@ public class SpdyHttpResponseStreamIdHandler extends
ids.remove(((SpdyRstStreamFrame) msg).getStreamId());
}
BufUtil.retain(msg);
return msg;
out.add(BufUtil.retain(msg));
}
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.socks;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;
@ -42,7 +43,7 @@ public class SocksAuthRequestDecoder extends ReplayingDecoder<SocksAuthRequestDe
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, MessageBuf<Object> out) throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {
version = SocksSubnegotiationVersion.fromByte(byteBuf.readByte());
@ -63,7 +64,7 @@ public class SocksAuthRequestDecoder extends ReplayingDecoder<SocksAuthRequestDe
}
}
ctx.pipeline().remove(this);
return msg;
out.add(msg);
}
enum State {

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.socks;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
@ -39,7 +40,8 @@ public class SocksAuthResponseDecoder extends ReplayingDecoder<SocksAuthResponse
}
@Override
protected Object decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, MessageBuf<Object> out)
throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {
version = SocksSubnegotiationVersion.fromByte(byteBuf.readByte());
@ -54,7 +56,7 @@ public class SocksAuthResponseDecoder extends ReplayingDecoder<SocksAuthResponse
}
}
channelHandlerContext.pipeline().remove(this);
return msg;
out.add(msg);
}
enum State {

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.socks;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;
@ -45,7 +46,7 @@ public class SocksCmdRequestDecoder extends ReplayingDecoder<SocksCmdRequestDeco
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, MessageBuf<Object> out) throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {
version = SocksProtocolVersion.fromByte(byteBuf.readByte());
@ -87,7 +88,7 @@ public class SocksCmdRequestDecoder extends ReplayingDecoder<SocksCmdRequestDeco
}
}
ctx.pipeline().remove(this);
return msg;
out.add(msg);
}
enum State {

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.socks;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;
@ -45,7 +46,7 @@ public class SocksCmdResponseDecoder extends ReplayingDecoder<SocksCmdResponseDe
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, MessageBuf<Object> out) throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {
version = SocksProtocolVersion.fromByte(byteBuf.readByte());
@ -87,7 +88,7 @@ public class SocksCmdResponseDecoder extends ReplayingDecoder<SocksCmdResponseDe
}
}
ctx.pipeline().remove(this);
return msg;
out.add(msg);
}
enum State {

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.socks;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
@ -43,7 +44,7 @@ public class SocksInitRequestDecoder extends ReplayingDecoder<SocksInitRequestDe
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, MessageBuf<Object> out) throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {
version = SocksProtocolVersion.fromByte(byteBuf.readByte());
@ -63,7 +64,7 @@ public class SocksInitRequestDecoder extends ReplayingDecoder<SocksInitRequestDe
}
}
ctx.pipeline().remove(this);
return msg;
out.add(msg);
}
enum State {

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.socks;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
@ -40,7 +41,7 @@ public class SocksInitResponseDecoder extends ReplayingDecoder<SocksInitResponse
}
@Override
protected SocksResponse decode(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, MessageBuf<Object> out) throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {
version = SocksProtocolVersion.fromByte(byteBuf.readByte());
@ -56,7 +57,7 @@ public class SocksInitResponseDecoder extends ReplayingDecoder<SocksInitResponse
}
}
ctx.pipeline().remove(this);
return msg;
out.add(msg);
}
enum State {

View File

@ -42,13 +42,13 @@ public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler
private final ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
return ByteToMessageCodec.this.decode(ctx, in);
public void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
ByteToMessageCodec.this.decode(ctx, in, out);
}
@Override
protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
return ByteToMessageCodec.this.decodeLast(ctx, in);
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
ByteToMessageCodec.this.decodeLast(ctx, in, out);
}
};
@ -105,8 +105,8 @@ public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler
}
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
protected abstract Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception;
protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
return decode(ctx, in);
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception;
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
decode(ctx, in, out);
}
}

View File

@ -17,7 +17,9 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
@ -31,9 +33,9 @@ import io.netty.channel.ChannelInboundByteHandlerAdapter;
* <pre>
* public class SquareDecoder extends {@link ByteToMessageDecoder} {
* {@code @Override}
* public {@link Object} decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in)
* public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, {@link MessageBuf} out)
* throws {@link Exception} {
* return in.readBytes(in.readableBytes());
* out.add(in.readBytes(in.readableBytes()));
* }
* }
* </pre>
@ -43,6 +45,19 @@ public abstract class ByteToMessageDecoder
private volatile boolean singleDecode;
private boolean decodeWasNull;
private MessageBuf<Object> decoderOutput;
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
decoderOutput = Unpooled.messageBuffer();
return super.newInboundBuffer(ctx);
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
super.freeInboundBuffer(ctx);
decoderOutput.release();
}
/**
* If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call.
@ -82,22 +97,33 @@ public abstract class ByteToMessageDecoder
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
MessageBuf<Object> out = decoderOutput();
try {
ByteBuf in = ctx.inboundByteBuffer();
if (in.isReadable()) {
callDecode(ctx, in);
}
decodeLast(ctx, in, out);
if (ctx.nextInboundMessageBuffer().unfoldAndAdd(decodeLast(ctx, in))) {
ctx.fireInboundBufferUpdated();
}
} catch (Throwable t) {
if (t instanceof CodecException) {
ctx.fireExceptionCaught(t);
throw (CodecException) t;
} else {
ctx.fireExceptionCaught(new DecoderException(t));
throw new DecoderException(t);
}
} finally {
boolean decoded = false;
for (;;) {
Object msg = out.poll();
if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
if (decoded) {
ctx.fireInboundBufferUpdated();
}
ctx.fireChannelInactive();
}
}
@ -106,12 +132,16 @@ public abstract class ByteToMessageDecoder
boolean wasNull = false;
boolean decoded = false;
MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
MessageBuf<Object> out = decoderOutput();
assert out.isEmpty();
while (in.isReadable()) {
try {
int outSize = out.size();
int oldInputLength = in.readableBytes();
Object o = decode(ctx, in);
if (o == null) {
decode(ctx, in, out);
if (outSize == out.size()) {
wasNull = true;
if (oldInputLength == in.readableBytes()) {
break;
@ -119,33 +149,36 @@ public abstract class ByteToMessageDecoder
continue;
}
}
wasNull = false;
if (oldInputLength == in.readableBytes()) {
throw new IllegalStateException(
"decode() did not read anything but decoded a message.");
}
if (out.unfoldAndAdd(o)) {
decoded = true;
if (isSingleDecode()) {
break;
}
} else {
if (isSingleDecode()) {
break;
}
} catch (Throwable t) {
if (t instanceof CodecException) {
throw (CodecException) t;
} else {
throw new DecoderException(t);
}
} finally {
for (;;) {
Object msg = out.poll();
if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
if (decoded) {
decoded = false;
ctx.fireInboundBufferUpdated();
}
if (t instanceof CodecException) {
ctx.fireExceptionCaught(t);
} else {
ctx.fireExceptionCaught(new DecoderException(t));
}
break;
}
}
@ -166,20 +199,25 @@ public abstract class ByteToMessageDecoder
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToByteDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @return message the message to which the content of the {@link ByteBuf} was decoded, or {@code null} if
* there was not enough data left in the {@link ByteBuf} to decode.
* @param out the {@link MessageBuf} to which decoded messages should be added
* @throws Exception is thrown if an error accour
*/
protected abstract Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception;
/**
* Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
* {@link #channelInactive(ChannelHandlerContext)} was triggered.
*
* By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf)} but sub-classes may
* By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, MessageBuf)} but sub-classes may
* override this for some special cleanup operation.
*/
protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
return decode(ctx, in);
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
decode(ctx, in, out);
}
final MessageBuf<Object> decoderOutput() {
return decoderOutput;
}
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
/**
@ -210,6 +211,13 @@ public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, buffer);

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
@ -75,6 +76,13 @@ public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.serialization.ObjectDecoder;
@ -347,6 +348,13 @@ public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (discardingTooLongFrame) {
long bytesToDiscard = this.bytesToDiscard;
@ -446,12 +454,12 @@ public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
discardingTooLongFrame = false;
if (!failFast ||
failFast && firstDetectionOfTooLongFrame) {
fail(ctx, tooLongFrameLength);
fail(tooLongFrameLength);
}
} else {
// Keep discarding and notify handlers if necessary.
if (failFast && firstDetectionOfTooLongFrame) {
fail(ctx, tooLongFrameLength);
fail(tooLongFrameLength);
}
}
}
@ -473,17 +481,15 @@ public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
return frame;
}
private void fail(ChannelHandlerContext ctx, long frameLength) {
private void fail(long frameLength) {
if (frameLength > 0) {
ctx.fireExceptionCaught(
new TooLongFrameException(
throw new TooLongFrameException(
"Adjusted frame length exceeds " + maxFrameLength +
": " + frameLength + " - discarded"));
": " + frameLength + " - discarded");
} else {
ctx.fireExceptionCaught(
new TooLongFrameException(
throw new TooLongFrameException(
"Adjusted frame length exceeds " + maxFrameLength +
" - discarding"));
" - discarding");
}
}
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
/**
@ -67,6 +68,13 @@ public class LineBasedFrameDecoder extends ByteToMessageDecoder {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
final int eol = findEndOfLine(buffer);
if (eol != -1) {

View File

@ -62,8 +62,8 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
@Override
@SuppressWarnings("unchecked")
protected Object encode(ChannelHandlerContext ctx, Object msg) throws Exception {
return MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg);
protected void encode(ChannelHandlerContext ctx, Object msg, MessageBuf<Object> out) throws Exception {
MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg, out);
}
};
@ -77,8 +77,8 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
@Override
@SuppressWarnings("unchecked")
protected Object decode(ChannelHandlerContext ctx, Object msg) throws Exception {
return MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg);
protected void decode(ChannelHandlerContext ctx, Object msg, MessageBuf<Object> out) throws Exception {
MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg, out);
}
};
@ -147,6 +147,6 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
return outboundMsgMatcher.match(msg);
}
protected abstract Object encode(ChannelHandlerContext ctx, OUTBOUND_IN msg) throws Exception;
protected abstract Object decode(ChannelHandlerContext ctx, INBOUND_IN msg) throws Exception;
protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, MessageBuf<Object> out) throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, MessageBuf<Object> out) throws Exception;
}

View File

@ -16,7 +16,9 @@
package io.netty.handler.codec;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
@ -29,14 +31,11 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter;
* <pre>
* public class StringToIntegerDecoder extends
* {@link MessageToMessageDecoder}&lt;{@link String}&gt; {
* public StringToIntegerDecoder() {
* super(String.class);
* }
*
* {@code @Override}
* public {@link Object} decode({@link ChannelHandlerContext} ctx, {@link String} message)
* throws {@link Exception} {
* return message.length());
* public void decode({@link ChannelHandlerContext} ctx, {@link String} message,
* {@link MessageBuf} out) throws {@link Exception} {
* out.add(message.length());
* }
* }
* </pre>
@ -44,6 +43,14 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter;
*/
public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHandlerAdapter<I> {
private static final ThreadLocal<MessageBuf<Object>> decoderOutput =
new ThreadLocal<MessageBuf<Object>>() {
@Override
protected MessageBuf<Object> initialValue() {
return Unpooled.messageBuffer();
}
};
protected MessageToMessageDecoder() { }
protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
@ -52,7 +59,18 @@ public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHa
@Override
public final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception {
ctx.nextInboundMessageBuffer().unfoldAndAdd(decode(ctx, msg));
MessageBuf<Object> out = decoderOutput.get();
try {
decode(ctx, msg, out);
} finally {
for (;;) {
Object obj = out.poll();
if (obj == null) {
break;
}
ChannelHandlerUtil.addToNextInboundBuffer(ctx, obj);
}
}
}
/**
@ -61,9 +79,8 @@ public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHa
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} belongs to
* @param msg the message to decode to an other one
* @return message the decoded message or {@code null} if more messages are needed be cause the implementation
* needs to do some kind of aggragation
* @param out the {@link MessageBuf} to which decoded messages should be added
* @throws Exception is thrown if an error accour
*/
protected abstract Object decode(ChannelHandlerContext ctx, I msg) throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, I msg, MessageBuf<Object> out) throws Exception;
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
@ -28,20 +29,24 @@ import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
* <pre>
* public class IntegerToStringEncoder extends
* {@link MessageToMessageEncoder}&lt;{@link Integer}&gt; {
* public StringToIntegerDecoder() {
* super(String.class);
* }
*
* {@code @Override}
* public {@link Object} encode({@link ChannelHandlerContext} ctx, {@link Integer} message)
* public void encode({@link ChannelHandlerContext} ctx, {@link Integer} message, {@link MessageBuf} out)
* throws {@link Exception} {
* return message.toString();
* out.add(message.toString());
* }
* }
* </pre>
*
*/
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {
private static final ThreadLocal<MessageBuf<Object>> encoderOutput =
new ThreadLocal<MessageBuf<Object>>() {
@Override
protected MessageBuf<Object> initialValue() {
return Unpooled.messageBuffer();
}
};
protected MessageToMessageEncoder() { }
@ -51,16 +56,30 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageH
@Override
public final void flush(ChannelHandlerContext ctx, I msg) throws Exception {
try {
Object encoded = encode(ctx, msg);
// Handle special case when the encoded output is a ByteBuf and the next handler in the pipeline
// accept bytes. Related to #1222
ChannelHandlerUtil.addToNextOutboundBuffer(ctx, encoded);
MessageBuf<Object> out = encoderOutput.get();
assert out.isEmpty();
try {
encode(ctx, msg, out);
} catch (CodecException e) {
throw e;
} catch (Exception e) {
throw new CodecException(e);
} catch (Throwable cause) {
if (cause instanceof CodecException) {
throw (CodecException) cause;
} else {
throw new EncoderException(cause);
}
} finally {
for (;;) {
Object encoded = out.poll();
if (encoded == null) {
break;
}
// Handle special case when the encoded output is a ByteBuf and the next handler in the pipeline
// accept bytes. Related to #1222
ChannelHandlerUtil.addToNextOutboundBuffer(ctx, encoded);
}
}
}
@ -70,9 +89,9 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageH
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageEncoder} belongs to
* @param msg the message to encode to an other one
* @return message the encoded message or {@code null} if more messages are needed be cause the implementation
* @param out the {@link MessageBuf} into which the encoded msg should be added
* needs to do some kind of aggragation
* @throws Exception is thrown if an error accour
*/
protected abstract Object encode(ChannelHandlerContext ctx, I msg) throws Exception;
protected abstract void encode(ChannelHandlerContext ctx, I msg, MessageBuf<Object> out) throws Exception;
}

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelPipeline;
import io.netty.util.Signal;
@ -364,6 +365,8 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
MessageBuf<Object> out = decoderOutput();
try {
replayable.terminate();
ByteBuf in = cumulation;
@ -371,19 +374,31 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
callDecode(ctx, in);
}
if (ctx.nextInboundMessageBuffer().unfoldAndAdd(decodeLast(ctx, replayable))) {
ctx.fireInboundBufferUpdated();
}
decodeLast(ctx, replayable, out);
} catch (Signal replay) {
// Ignore
replay.expect(REPLAY);
} catch (Throwable t) {
if (t instanceof CodecException) {
ctx.fireExceptionCaught(t);
throw (CodecException) t;
} else {
ctx.fireExceptionCaught(new DecoderException(t));
throw new DecoderException(t);
}
} finally {
boolean decoded = false;
for (;;) {
Object msg = out.poll();
if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
if (decoded) {
ctx.fireInboundBufferUpdated();
}
ctx.fireChannelInactive();
}
}
@ -393,16 +408,17 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
boolean wasNull = false;
ByteBuf in = cumulation;
MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
MessageBuf<Object> out = decoderOutput();
boolean decoded = false;
while (in.isReadable()) {
try {
int oldReaderIndex = checkpoint = in.readerIndex();
Object result = null;
int outSize = out.size();
S oldState = state;
try {
result = decode(ctx, replayable);
if (result == null) {
decode(ctx, replayable, out);
if (outSize == out.size()) {
wasNull = true;
if (oldReaderIndex == in.readerIndex() && oldState == state) {
throw new IllegalStateException(
"null cannot be returned if no data is consumed and state didn't change.");
@ -422,13 +438,6 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
// Called by cleanup() - no need to maintain the readerIndex
// anymore because the buffer has been released already.
}
}
if (result == null) {
wasNull = true;
// Seems like more data is required.
// Let us wait for the next notification.
break;
}
wasNull = false;
@ -439,27 +448,26 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
"if it returned a decoded message (caused by: " +
getClass() + ')');
}
} catch (Throwable t) {
if (t instanceof CodecException) {
throw (CodecException) t;
} else {
throw new DecoderException(t);
}
} finally {
// A successful decode
if (out.unfoldAndAdd(result)) {
decoded = true;
if (isSingleDecode()) {
for (;;) {
Object msg = out.poll();
if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
} catch (Throwable t) {
if (decoded) {
decoded = false;
ctx.fireInboundBufferUpdated();
}
if (t instanceof CodecException) {
ctx.fireExceptionCaught(t);
} else {
ctx.fireExceptionCaught(new DecoderException(t));
}
break;
}
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.base64;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
@ -58,7 +59,7 @@ public class Base64Decoder extends MessageToMessageDecoder<ByteBuf> {
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
return Base64.decode(msg, msg.readerIndex(), msg.readableBytes(), dialect);
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageBuf<Object> out) throws Exception {
out.add(Base64.decode(msg, msg.readerIndex(), msg.readableBytes(), dialect));
}
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.bytes;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@ -49,7 +50,7 @@ import io.netty.handler.codec.MessageToMessageDecoder;
public class ByteArrayDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageBuf<Object> out) throws Exception {
byte[] array;
if (msg.hasArray()) {
if (msg.arrayOffset() == 0 && msg.readableBytes() == msg.capacity()) {
@ -67,6 +68,6 @@ public class ByteArrayDecoder extends MessageToMessageDecoder<ByteBuf> {
msg.getBytes(0, array);
}
return array;
out.add(array);
}
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.marshalling;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
@ -54,11 +55,11 @@ public class CompatibleMarshallingDecoder extends ReplayingDecoder<Void> {
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf<Object> out) throws Exception {
if (discardingTooLongFrame) {
buffer.skipBytes(actualReadableBytes());
checkpoint();
return null;
return;
}
Unmarshaller unmarshaller = provider.getUnmarshaller(ctx);
@ -70,7 +71,7 @@ public class CompatibleMarshallingDecoder extends ReplayingDecoder<Void> {
unmarshaller.start(input);
Object obj = unmarshaller.readObject();
unmarshaller.finish();
return obj;
out.add(obj);
} catch (LimitingByteInput.TooBigObjectException e) {
discardingTooLongFrame = true;
throw new TooLongFrameException();
@ -82,19 +83,19 @@ public class CompatibleMarshallingDecoder extends ReplayingDecoder<Void> {
}
@Override
protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf<Object> out) throws Exception {
switch (buffer.readableBytes()) {
case 0:
return null;
return;
case 1:
// Ignore the last TC_RESET
if (buffer.getByte(buffer.readerIndex()) == ObjectStreamConstants.TC_RESET) {
buffer.skipBytes(1);
return null;
return;
}
}
return decode(ctx, buffer);
decode(ctx, buffer, out);
}
@Override

View File

@ -19,6 +19,7 @@ import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
@ -94,7 +95,7 @@ public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf> {
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageBuf<Object> out) throws Exception {
final byte[] array;
final int offset;
final int length = msg.readableBytes();
@ -109,15 +110,15 @@ public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf> {
if (extensionRegistry == null) {
if (HAS_PARSER) {
return prototype.getParserForType().parseFrom(array, offset, length);
out.add(prototype.getParserForType().parseFrom(array, offset, length));
} else {
return prototype.newBuilderForType().mergeFrom(array, offset, length).build();
out.add(prototype.newBuilderForType().mergeFrom(array, offset, length).build());
}
} else {
if (HAS_PARSER) {
return prototype.getParserForType().parseFrom(array, offset, length, extensionRegistry);
out.add(prototype.getParserForType().parseFrom(array, offset, length, extensionRegistry));
} else {
return prototype.newBuilderForType().mergeFrom(array, offset, length, extensionRegistry).build();
out.add(prototype.newBuilderForType().mergeFrom(array, offset, length, extensionRegistry).build());
}
}
}

View File

@ -19,6 +19,7 @@ import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
import com.google.protobuf.MessageLiteOrBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
@ -59,13 +60,14 @@ import static io.netty.buffer.Unpooled.*;
public class ProtobufEncoder extends MessageToMessageEncoder<MessageLiteOrBuilder> {
@Override
protected Object encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg) throws Exception {
protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, MessageBuf<Object> out)
throws Exception {
if (msg instanceof MessageLite) {
return wrappedBuffer(((MessageLite) msg).toByteArray());
out.add(wrappedBuffer(((MessageLite) msg).toByteArray()));
return;
}
if (msg instanceof MessageLite.Builder) {
return wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
out.add(wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray()));
}
return null;
}
}

View File

@ -17,6 +17,7 @@ package io.netty.handler.codec.protobuf;
import com.google.protobuf.CodedInputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.CorruptedFrameException;
@ -42,13 +43,13 @@ public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {
// (just like LengthFieldBasedFrameDecoder)
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
in.markReaderIndex();
final byte[] buf = new byte[5];
for (int i = 0; i < buf.length; i ++) {
if (!in.isReadable()) {
in.resetReaderIndex();
return null;
return;
}
buf[i] = in.readByte();
@ -60,9 +61,10 @@ public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {
if (in.readableBytes() < length) {
in.resetReaderIndex();
return null;
return;
} else {
return in.readBytes(length);
out.add(in.readBytes(length));
return;
}
}
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.string;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
@ -74,7 +75,7 @@ public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
return msg.toString(charset);
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageBuf<Object> out) throws Exception {
out.add(msg.toString(charset));
}
}

View File

@ -17,6 +17,7 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufIndexFinder;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedByteChannel;
@ -53,10 +54,10 @@ public class ReplayingDecoderTest {
}
@Override
protected ByteBuf decode(ChannelHandlerContext ctx, ByteBuf in) {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) {
ByteBuf msg = in.readBytes(in.bytesBefore(ByteBufIndexFinder.LF));
in.skipBytes(1);
return msg;
out.add(msg);
}
}
}

View File

@ -19,8 +19,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedByteChannel;
import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.TooLongFrameException;
import org.jboss.marshalling.Marshaller;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
@ -112,12 +110,12 @@ public abstract class AbstractCompatibleMarshallingDecoderTest {
marshaller.close();
byte[] testBytes = bout.toByteArray();
try {
ch.writeInbound(input(testBytes));
fail();
} catch (CodecException e) {
assertEquals(TooLongFrameException.class, e.getClass());
}
onTooBigFrame(ch, input(testBytes));
}
protected void onTooBigFrame(EmbeddedByteChannel ch, ByteBuf input) {
ch.writeInbound(input);
assertFalse(ch.isActive());
}
protected ChannelHandler createDecoder(int maxObjectSize) {

View File

@ -18,6 +18,11 @@ package io.netty.handler.codec.marshalling;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedByteChannel;
import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.TooLongFrameException;
import static org.junit.Assert.*;
public class RiverMarshallingDecoderTest extends RiverCompatibleMarshallingDecoderTest {
@ -34,4 +39,13 @@ public class RiverMarshallingDecoderTest extends RiverCompatibleMarshallingDecod
createMarshallingConfig()), maxObjectSize);
}
@Override
protected void onTooBigFrame(EmbeddedByteChannel ch, ByteBuf input) {
try {
ch.writeInbound(input);
fail();
} catch (CodecException e) {
assertEquals(TooLongFrameException.class, e.getClass());
}
}
}

View File

@ -18,6 +18,11 @@ package io.netty.handler.codec.marshalling;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedByteChannel;
import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.TooLongFrameException;
import static org.junit.Assert.*;
public class SerialMarshallingDecoderTest extends SerialCompatibleMarshallingDecoderTest {
@ -34,4 +39,13 @@ public class SerialMarshallingDecoderTest extends SerialCompatibleMarshallingDec
createMarshallingConfig()), maxObjectSize);
}
@Override
protected void onTooBigFrame(EmbeddedByteChannel ch, ByteBuf input) {
try {
ch.writeInbound(input);
fail();
} catch (CodecException e) {
assertEquals(TooLongFrameException.class, e.getClass());
}
}
}

View File

@ -16,6 +16,7 @@
package io.netty.example.factorial;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.CorruptedFrameException;
@ -31,10 +32,10 @@ import java.math.BigInteger;
public class BigIntegerDecoder extends ByteToMessageDecoder {
@Override
protected BigInteger decode(ChannelHandlerContext ctx, ByteBuf in) {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) {
// Wait until the length prefix is available.
if (in.readableBytes() < 5) {
return null;
return;
}
in.markReaderIndex();
@ -51,13 +52,13 @@ public class BigIntegerDecoder extends ByteToMessageDecoder {
int dataLength = in.readInt();
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return null;
return;
}
// Convert the received data into a new BigInteger.
byte[] decoded = new byte[dataLength];
in.readBytes(decoded);
return new BigInteger(decoded);
out.add(new BigInteger(decoded));
}
}

View File

@ -221,7 +221,7 @@ public final class ChannelHandlerUtil {
return true;
}
}
return ctx.nextOutboundMessageBuffer().unfoldAndAdd(msg);
return ctx.nextOutboundMessageBuffer().add(msg);
}
/**
@ -235,7 +235,7 @@ public final class ChannelHandlerUtil {
return true;
}
}
return ctx.nextInboundMessageBuffer().unfoldAndAdd(msg);
return ctx.nextInboundMessageBuffer().add(msg);
}
private ChannelHandlerUtil() { }