Add isDecodable/isEncodable() to codecs to support stacked codecs

This commit is contained in:
Trustin Lee 2012-05-29 13:34:01 -07:00
parent 026715e818
commit b10cf29393
27 changed files with 220 additions and 69 deletions

View File

@ -71,6 +71,10 @@ public class HttpChunkAggregator extends MessageToMessageDecoder<Object, HttpMes
this.maxContentLength = maxContentLength;
}
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof HttpMessage || msg instanceof HttpChunk;
}
@Override
public HttpMessage decode(ChannelInboundHandlerContext<Object> ctx, Object msg) throws Exception {

View File

@ -50,6 +50,11 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<Object,
protected HttpContentDecoder() {
}
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof HttpMessage || msg instanceof HttpChunk;
}
@Override
public Object decode(ChannelInboundHandlerContext<Object> ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().getCode() == 100) {

View File

@ -47,7 +47,7 @@ import java.util.Queue;
* so that this handler can intercept HTTP responses before {@link HttpMessageEncoder}
* converts them into {@link ChannelBuffer}s.
*/
public abstract class HttpContentEncoder extends MessageToMessageCodec<Object, Object, Object, Object> {
public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessage, HttpMessage, Object, Object> {
private final Queue<String> acceptEncodingQueue = QueueFactory.createQueue(String.class);
private volatile EncoderEmbedder<ChannelBuffer> encoder;
@ -59,20 +59,25 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<Object, O
}
@Override
public Object decode(ChannelInboundHandlerContext<Object> ctx, Object msg)
throws Exception {
if (!(msg instanceof HttpMessage)) {
return msg;
}
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof HttpMessage;
}
HttpMessage m = (HttpMessage) msg;
String acceptedEncoding = m.getHeader(HttpHeaders.Names.ACCEPT_ENCODING);
@Override
public HttpMessage decode(ChannelInboundHandlerContext<HttpMessage> ctx, HttpMessage msg)
throws Exception {
String acceptedEncoding = msg.getHeader(HttpHeaders.Names.ACCEPT_ENCODING);
if (acceptedEncoding == null) {
acceptedEncoding = HttpHeaders.Values.IDENTITY;
}
boolean offered = acceptEncodingQueue.offer(acceptedEncoding);
assert offered;
return m;
return msg;
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof HttpMessage || msg instanceof HttpChunk;
}
@Override

View File

@ -55,6 +55,11 @@ public abstract class HttpMessageEncoder extends MessageToStreamEncoder<Object>
protected HttpMessageEncoder() {
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof HttpMessage || msg instanceof HttpChunk;
}
@Override
public void encode(ChannelOutboundHandlerContext<Object> ctx, Object msg, ChannelBuffer out) throws Exception {
if (msg instanceof HttpMessage) {

View File

@ -32,6 +32,11 @@ import io.netty.handler.codec.MessageToStreamEncoder;
@Sharable
public class WebSocket00FrameEncoder extends MessageToStreamEncoder<WebSocketFrame> {
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof WebSocketFrame;
}
@Override
public void encode(
ChannelOutboundHandlerContext<WebSocketFrame> ctx,

View File

@ -93,6 +93,11 @@ public class WebSocket08FrameEncoder extends MessageToStreamEncoder<WebSocketFra
this.maskPayload = maskPayload;
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof WebSocketFrame;
}
@Override
public void encode(ChannelOutboundHandlerContext<WebSocketFrame> ctx,
WebSocketFrame msg, ChannelBuffer out) throws Exception {

View File

@ -74,6 +74,19 @@ public class SpdyFrameEncoder extends MessageToStreamEncoder<Object> {
}
}
@Override
public boolean isEncodable(Object msg) throws Exception {
// TODO: Introduce a super type.
return msg instanceof SpdyDataFrame ||
msg instanceof SpdySynStreamFrame ||
msg instanceof SpdySynReplyFrame ||
msg instanceof SpdyRstStreamFrame ||
msg instanceof SpdySettingsFrame ||
msg instanceof SpdyNoOpFrame ||
msg instanceof SpdyGoAwayFrame ||
msg instanceof SpdyHeadersFrame;
}
@Override
public void encode(ChannelOutboundHandlerContext<Object> ctx, Object msg,
ChannelBuffer out) throws Exception {

View File

@ -59,6 +59,13 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<Object, Object> {
this.maxContentLength = maxContentLength;
}
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof SpdySynStreamFrame ||
msg instanceof SpdySynReplyFrame ||
msg instanceof SpdyHeadersFrame ||
msg instanceof SpdyDataFrame;
}
@Override
public Object decode(ChannelInboundHandlerContext<Object> ctx, Object msg)

View File

@ -110,6 +110,10 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<Object, Object> {
private volatile int currentStreamID;
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof HttpMessage || msg instanceof HttpChunk;
}
@Override
public Object encode(ChannelOutboundHandlerContext<Object> ctx, Object msg)

View File

@ -96,6 +96,11 @@ public class LengthFieldPrepender extends MessageToStreamEncoder<ChannelBuffer>
this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength;
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof ChannelBuffer;
}
@Override
public void encode(
ChannelOutboundHandlerContext<ChannelBuffer> ctx,

View File

@ -11,6 +11,11 @@ public abstract class MessageToMessageCodec<INBOUND_IN, INBOUND_OUT, OUTBOUND_IN
private final MessageToMessageEncoder<OUTBOUND_IN, OUTBOUND_OUT> encoder =
new MessageToMessageEncoder<OUTBOUND_IN, OUTBOUND_OUT>() {
@Override
public boolean isEncodable(Object msg) throws Exception {
return MessageToMessageCodec.this.isEncodable(msg);
}
@Override
public OUTBOUND_OUT encode(ChannelOutboundHandlerContext<OUTBOUND_IN> ctx, OUTBOUND_IN msg) throws Exception {
return MessageToMessageCodec.this.encode(ctx, msg);
@ -19,6 +24,11 @@ public abstract class MessageToMessageCodec<INBOUND_IN, INBOUND_OUT, OUTBOUND_IN
private final MessageToMessageDecoder<INBOUND_IN, INBOUND_OUT> decoder =
new MessageToMessageDecoder<INBOUND_IN, INBOUND_OUT>() {
@Override
public boolean isDecodable(Object msg) throws Exception {
return MessageToMessageCodec.this.isDecodable(msg);
}
@Override
public INBOUND_OUT decode(ChannelInboundHandlerContext<INBOUND_IN> ctx, INBOUND_IN msg) throws Exception {
return MessageToMessageCodec.this.decode(ctx, msg);
@ -46,6 +56,24 @@ public abstract class MessageToMessageCodec<INBOUND_IN, INBOUND_OUT, OUTBOUND_IN
encoder.flush(ctx, future);
}
/**
* Returns {@code true} if and only if the specified message can be decoded by this codec.
*
* @param msg the message
*/
public boolean isDecodable(Object msg) throws Exception {
return true;
}
/**
* Returns {@code true} if and only if the specified message can be encoded by this codec.
*
* @param msg the message
*/
public boolean isEncodable(Object msg) throws Exception {
return true;
}
public abstract OUTBOUND_OUT encode(ChannelOutboundHandlerContext<OUTBOUND_IN> ctx, OUTBOUND_IN msg) throws Exception;
public abstract INBOUND_OUT decode(ChannelInboundHandlerContext<INBOUND_IN> ctx, INBOUND_IN msg) throws Exception;
}

View File

@ -20,23 +20,30 @@ public abstract class MessageToMessageDecoder<I, O> extends ChannelInboundHandle
public void inboundBufferUpdated(ChannelInboundHandlerContext<I> ctx)
throws Exception {
Queue<I> in = ctx.inbound().messageBuffer();
boolean decoded = false;
boolean notify = false;
for (;;) {
try {
I msg = in.poll();
Object msg = in.poll();
if (msg == null) {
break;
}
if (!isDecodable(msg)) {
ctx.nextInboundMessageBuffer().add(msg);
notify = true;
continue;
}
O emsg = decode(ctx, msg);
if (emsg == null) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
O omsg = decode(ctx, imsg);
if (omsg == null) {
// Decoder consumed a message but returned null.
// Probably it needs more messages because it's an aggregator.
continue;
}
if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), emsg)) {
decoded = true;
if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), omsg)) {
notify = true;
}
} catch (Throwable t) {
if (t instanceof CodecException) {
@ -46,10 +53,19 @@ public abstract class MessageToMessageDecoder<I, O> extends ChannelInboundHandle
}
}
}
if (decoded) {
if (notify) {
ctx.fireInboundBufferUpdated();
}
}
/**
* Returns {@code true} if and only if the specified message can be decoded by this decoder.
*
* @param msg the message
*/
public boolean isDecodable(Object msg) throws Exception {
return true;
}
public abstract O decode(ChannelInboundHandlerContext<I> ctx, I msg) throws Exception;
}

View File

@ -20,23 +20,31 @@ public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundHandl
@Override
public void flush(ChannelOutboundHandlerContext<I> ctx, ChannelFuture future) throws Exception {
Queue<I> in = ctx.outbound().messageBuffer();
boolean encoded = false;
boolean notify = false;
for (;;) {
try {
I msg = in.poll();
Object msg = in.poll();
if (msg == null) {
break;
}
O emsg = encode(ctx, msg);
if (emsg == null) {
if (!isEncodable(msg)) {
ctx.nextOutboundMessageBuffer().add(msg);
notify = true;
continue;
}
@SuppressWarnings("unchecked")
I imsg = (I) msg;
O omsg = encode(ctx, imsg);
if (omsg == null) {
// encode() might be waiting for more inbound messages to generate
// an aggregated message - keep polling.
continue;
}
if (unfoldAndAdd(ctx, ctx.nextOutboundMessageBuffer(), emsg)) {
encoded = true;
if (unfoldAndAdd(ctx, ctx.nextOutboundMessageBuffer(), omsg)) {
notify = true;
}
} catch (Throwable t) {
if (t instanceof CodecException) {
@ -47,11 +55,20 @@ public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundHandl
}
}
if (encoded) {
if (notify) {
ctx.flush(future);
}
}
/**
* Returns {@code true} if and only if the specified message can be encoded by this encoder.
*
* @param msg the message
*/
public boolean isEncodable(Object msg) throws Exception {
return true;
}
public abstract O encode(ChannelOutboundHandlerContext<I> ctx, I msg) throws Exception;
static <T> boolean unfoldAndAdd(

View File

@ -22,15 +22,24 @@ public abstract class MessageToStreamEncoder<I> extends ChannelOutboundHandlerAd
Queue<I> in = ctx.outbound().messageBuffer();
ChannelBuffer out = ctx.nextOutboundByteBuffer();
boolean notify = false;
int oldOutSize = out.readableBytes();
for (;;) {
I msg = in.poll();
Object msg = in.poll();
if (msg == null) {
break;
}
if (!isEncodable(msg)) {
ctx.nextOutboundMessageBuffer().add(msg);
notify = true;
continue;
}
@SuppressWarnings("unchecked")
I imsg = (I) msg;
try {
encode(ctx, msg, out);
encode(ctx, imsg, out);
} catch (Throwable t) {
if (t instanceof CodecException) {
ctx.fireExceptionCaught(t);
@ -40,10 +49,19 @@ public abstract class MessageToStreamEncoder<I> extends ChannelOutboundHandlerAd
}
}
if (out.readableBytes() > oldOutSize) {
if (out.readableBytes() > oldOutSize || notify) {
ctx.flush(future);
}
}
/**
* Returns {@code true} if and only if the specified message can be encoded by this encoder.
*
* @param msg the message
*/
public boolean isEncodable(Object msg) throws Exception {
return true;
}
public abstract void encode(ChannelOutboundHandlerContext<I> ctx, I msg, ChannelBuffer out) throws Exception;
}

View File

@ -59,6 +59,11 @@ public class Base64Decoder extends MessageToMessageDecoder<ChannelBuffer, Channe
this.dialect = dialect;
}
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof ChannelBuffer;
}
@Override
public ChannelBuffer decode(ChannelInboundHandlerContext<ChannelBuffer> ctx, ChannelBuffer msg) throws Exception {
return Base64.decode(msg, msg.readerIndex(), msg.readableBytes(), dialect);

View File

@ -62,6 +62,11 @@ public class Base64Encoder extends MessageToMessageEncoder<ChannelBuffer, Channe
this.dialect = dialect;
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof ChannelBuffer;
}
@Override
public ChannelBuffer encode(ChannelOutboundHandlerContext<ChannelBuffer> ctx,
ChannelBuffer msg) throws Exception {

View File

@ -50,6 +50,11 @@ import io.netty.handler.codec.MessageToMessageDecoder;
*/
public class ByteArrayDecoder extends MessageToMessageDecoder<ChannelBuffer, byte[]> {
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof ChannelBuffer;
}
@Override
public byte[] decode(ChannelInboundHandlerContext<ChannelBuffer> ctx, ChannelBuffer msg) throws Exception {
byte[] array;

View File

@ -58,6 +58,11 @@ public class ByteArrayEncoder extends MessageToMessageEncoder<byte[], ChannelBuf
return ChannelBufferHolders.messageBuffer();
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof byte[];
}
@Override
public ChannelBuffer encode(ChannelOutboundHandlerContext<byte[]> ctx, byte[] msg) throws Exception {
if (msg.length == 0) {

View File

@ -82,6 +82,11 @@ public class ProtobufDecoder extends MessageToMessageDecoder<ChannelBuffer, Mess
this.extensionRegistry = extensionRegistry;
}
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof ChannelBuffer;
}
@Override
public MessageLite decode(ChannelInboundHandlerContext<ChannelBuffer> ctx, ChannelBuffer msg) throws Exception {
if (msg.hasArray()) {

View File

@ -60,6 +60,11 @@ import com.google.protobuf.MessageLite;
@Sharable
public class ProtobufEncoder extends MessageToMessageEncoder<Object, ChannelBuffer> {
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof MessageLite || msg instanceof MessageLite.Builder;
}
@Override
public ChannelBuffer encode(ChannelOutboundHandlerContext<Object> ctx, Object msg) throws Exception {
if (msg instanceof MessageLite) {

View File

@ -46,6 +46,11 @@ public class ProtobufVarint32LengthFieldPrepender extends MessageToStreamEncoder
public ProtobufVarint32LengthFieldPrepender() {
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof ChannelBuffer;
}
@Override
public void encode(
ChannelOutboundHandlerContext<ChannelBuffer> ctx, ChannelBuffer msg, ChannelBuffer out) throws Exception {

View File

@ -25,6 +25,7 @@ import io.netty.util.AttributeKey;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
/**
* An encoder which serializes a Java object into a {@link ChannelBuffer}
@ -75,6 +76,11 @@ public class CompatibleObjectEncoder extends MessageToStreamEncoder<Object> {
return new ObjectOutputStream(out);
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof Serializable;
}
@Override
public void encode(ChannelOutboundHandlerContext<Object> ctx, Object msg, ChannelBuffer out) throws Exception {
Attribute<ObjectOutputStream> oosAttr = ctx.attr(OOS);

View File

@ -23,6 +23,7 @@ import io.netty.handler.codec.MessageToStreamEncoder;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
/**
* An encoder which serializes a Java object into a {@link ChannelBuffer}.
@ -38,32 +39,9 @@ import java.io.ObjectOutputStream;
public class ObjectEncoder extends MessageToStreamEncoder<Object> {
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
private final int estimatedLength;
/**
* Creates a new encoder with the estimated length of 512 bytes.
*/
public ObjectEncoder() {
this(512);
}
/**
* Creates a new encoder.
*
* @param estimatedLength
* the estimated byte length of the serialized form of an object.
* If the length of the serialized form exceeds this value, the
* internal buffer will be expanded automatically at the cost of
* memory bandwidth. If this value is too big, it will also waste
* memory bandwidth. To avoid unnecessary memory copy or allocation
* cost, please specify the properly estimated value.
*/
public ObjectEncoder(int estimatedLength) {
if (estimatedLength < 0) {
throw new IllegalArgumentException(
"estimatedLength: " + estimatedLength);
}
this.estimatedLength = estimatedLength;
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof Serializable;
}
@Override

View File

@ -75,6 +75,11 @@ public class StringDecoder extends MessageToMessageDecoder<ChannelBuffer, String
this.charset = charset;
}
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof ChannelBuffer;
}
@Override
public String decode(ChannelInboundHandlerContext<ChannelBuffer> ctx, ChannelBuffer msg) throws Exception {
return msg.toString(charset);

View File

@ -73,6 +73,11 @@ public class StringEncoder extends MessageToMessageEncoder<String, ChannelBuffer
this.charset = charset;
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof String;
}
@Override
public ChannelBuffer encode(ChannelOutboundHandlerContext<String> ctx, String msg) throws Exception {
return ChannelBuffers.copiedBuffer(msg, charset);

View File

@ -18,7 +18,6 @@ package io.netty.handler.codec.bytes;
import static io.netty.buffer.ChannelBuffers.*;
import static org.hamcrest.core.Is.*;
import static org.junit.Assert.*;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import java.util.Random;
@ -56,12 +55,6 @@ public class ByteArrayDecoderTest {
public void testDecodeOtherType() {
String str = "Meep!";
embedder.offer(str);
try {
embedder.poll();
fail();
} catch (DecoderException e) {
// Expected
assertTrue(e.getCause() instanceof ClassCastException);
}
assertThat(embedder.poll(), is((Object) str));
}
}

View File

@ -20,7 +20,6 @@ import static org.hamcrest.core.Is.*;
import static org.hamcrest.core.IsNull.*;
import static org.junit.Assert.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.embedder.EncoderEmbedder;
import java.util.Random;
@ -58,13 +57,6 @@ public class ByteArrayEncoderTest {
public void testEncodeOtherType() {
String str = "Meep!";
embedder.offer(str);
try {
embedder.poll();
fail();
} catch (EncoderException e) {
// Expected
assertTrue(e.getCause() instanceof ClassCastException);
}
assertThat(embedder.poll(), is((Object) str));
}
}