From fa1b49de98b898d760859ee547352be544eb6c8f Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 8 Feb 2013 15:44:41 +0900 Subject: [PATCH] More robust automatic messageType detection for ChannelInboundMessageHandlerAdapter and MessageToMessageDecoder --- .../codec/http/HttpContentDecoder.java | 13 +-- .../codec/http/HttpObjectAggregator.java | 2 - .../handler/codec/spdy/SpdyHttpDecoder.java | 7 +- .../handler/codec/MessageToMessageCodec.java | 5 +- .../codec/MessageToMessageDecoder.java | 101 +++--------------- .../handler/codec/base64/Base64Decoder.java | 2 - .../handler/codec/bytes/ByteArrayDecoder.java | 4 - .../codec/protobuf/ProtobufDecoder.java | 9 +- .../handler/codec/string/StringDecoder.java | 2 - .../codec/bytes/ByteArrayDecoderTest.java | 13 +-- .../sctp/SctpInboundByteStreamHandler.java | 4 +- .../sctp/SctpMessageToMessageDecoder.java | 2 +- .../ChannelInboundMessageHandlerAdapter.java | 57 +++++----- .../channel/DefaultChannelPipelineTest.java | 4 +- 14 files changed, 70 insertions(+), 155 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java index b50fe7a56f..329a0773c9 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java @@ -41,21 +41,14 @@ import io.netty.handler.codec.MessageToMessageDecoder; * so that this handler can intercept HTTP requests after {@link HttpObjectDecoder} * converts {@link ByteBuf}s into HTTP requests. */ -public abstract class HttpContentDecoder extends MessageToMessageDecoder { +public abstract class HttpContentDecoder extends MessageToMessageDecoder { private EmbeddedByteChannel decoder; private HttpMessage message; private boolean decodeStarted; - /** - * Creates a new instance. - */ - protected HttpContentDecoder() { - super(HttpObject.class); - } - @Override - protected Object decode(ChannelHandlerContext ctx, Object msg) throws Exception { + protected Object decode(ChannelHandlerContext ctx, HttpObject msg) throws Exception { if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().code() == 100) { // 100-continue response must be passed through. return msg; @@ -115,7 +108,7 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder } @Override - protected void freeInboundMessage(Object msg) throws Exception { + protected void freeInboundMessage(HttpObject msg) throws Exception { if (decoder == null) { // if the decoder was null we returned the original message so we are not allowed to free it return; diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectAggregator.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectAggregator.java index 8e171650a9..5ca517078a 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectAggregator.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectAggregator.java @@ -66,8 +66,6 @@ public class HttpObjectAggregator extends MessageToMessageDecoder { * a {@link TooLongFrameException} will be raised. */ public HttpObjectAggregator(int maxContentLength) { - super(HttpObject.class); - if (maxContentLength <= 0) { throw new IllegalArgumentException( "maxContentLength must be a positive integer: " + diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java index 1ccd09bcf7..60f39550d0 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java @@ -52,8 +52,6 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder { * a {@link TooLongFrameException} will be raised. */ public SpdyHttpDecoder(int version, int maxContentLength) { - super(SpdyDataFrame.class, SpdyControlFrame.class); - if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { throw new IllegalArgumentException( "unsupported version: " + version); @@ -66,6 +64,11 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder { this.maxContentLength = maxContentLength; } + @Override + public boolean acceptInboundMessage(Object msg) throws Exception { + return msg instanceof SpdyDataFrame || msg instanceof SpdyControlFrame; + } + @Override public Object decode(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof SpdySynStreamFrame) { diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java index 1ddeb0e232..f2417ed7c2 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java @@ -73,9 +73,10 @@ public abstract class MessageToMessageCodec private final MessageToMessageDecoder decoder = new MessageToMessageDecoder() { + @Override - public boolean isDecodable(Object msg) throws Exception { - return MessageToMessageCodec.this.isDecodable(msg); + public boolean acceptInboundMessage(Object msg) throws Exception { + return isDecodable(msg); } @Override diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index 729c8953ab..d18a315de5 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -16,12 +16,9 @@ package io.netty.handler.codec; import io.netty.buffer.MessageBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundMessageHandler; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelStateHandlerAdapter; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; /** * {@link ChannelInboundMessageHandler} which decodes from one message to an other message @@ -45,81 +42,25 @@ import io.netty.channel.ChannelStateHandlerAdapter; * * */ -public abstract class MessageToMessageDecoder - extends ChannelStateHandlerAdapter implements ChannelInboundMessageHandler { +public abstract class MessageToMessageDecoder extends ChannelInboundMessageHandlerAdapter { - private final Class[] acceptedMsgTypes; + protected MessageToMessageDecoder() { + super(MessageToMessageDecoder.class, 0); + } - /** - * The types which will be accepted by the decoder. If a received message is an other type it will be just forwarded - * to the next {@link ChannelInboundMessageHandler} in the {@link ChannelPipeline} - */ - protected MessageToMessageDecoder(Class... acceptedMsgTypes) { - this.acceptedMsgTypes = ChannelHandlerUtil.acceptedMessageTypes(acceptedMsgTypes); + protected MessageToMessageDecoder( + @SuppressWarnings("rawtypes") + Class parameterizedHandlerType, + int messageTypeParamIndex) { + super(parameterizedHandlerType, messageTypeParamIndex); } @Override - public MessageBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { - return Unpooled.messageBuffer(); - } - - @Override - public void inboundBufferUpdated(ChannelHandlerContext ctx) - throws Exception { - MessageBuf in = ctx.inboundMessageBuffer(); - MessageBuf out = ctx.nextInboundMessageBuffer(); - boolean notify = false; - for (;;) { - try { - Object msg = in.poll(); - if (msg == null) { - break; - } - if (!isDecodable(msg)) { - out.add(msg); - notify = true; - continue; - } - - @SuppressWarnings("unchecked") - I imsg = (I) msg; - boolean free = true; - try { - Object omsg = decode(ctx, imsg); - if (omsg == null) { - // Decoder consumed a message but returned null. - // Probably it needs more messages because it's an aggregator. - continue; - } - if (omsg == imsg) { - free = false; - } - if (ChannelHandlerUtil.unfoldAndAdd(ctx, omsg, true)) { - notify = true; - } - } finally { - if (free) { - freeInboundMessage(imsg); - } - } - } catch (Throwable t) { - if (t instanceof CodecException) { - ctx.fireExceptionCaught(t); - } else { - ctx.fireExceptionCaught(new DecoderException(t)); - } - } + protected final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception { + Object decoded = decode(ctx, msg); + if (decoded != null) { + ctx.nextInboundMessageBuffer().add(decoded); } - if (notify) { - ctx.fireInboundBufferUpdated(); - } - } - - /** - * Returns {@code true} if and only if the specified message can be decoded by this decoder. - */ - public boolean isDecodable(Object msg) throws Exception { - return ChannelHandlerUtil.acceptMessage(acceptedMsgTypes, msg); } /** @@ -133,18 +74,4 @@ public abstract class MessageToMessageDecoder * @throws Exception is thrown if an error accour */ protected abstract Object decode(ChannelHandlerContext ctx, I msg) throws Exception; - - /** - * Is called after a message was processed via {@link #decode(ChannelHandlerContext, Object)} to free - * up any resources that is held by the inbound message. You may want to override this if your implementation - * just pass-through the input message or need it for later usage. - */ - protected void freeInboundMessage(I msg) throws Exception { - ChannelHandlerUtil.freeMessage(msg); - } - - @Override - public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { - ctx.inboundMessageBuffer().free(); - } } diff --git a/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java b/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java index ad4e6bcc97..6b33120606 100644 --- a/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java +++ b/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java @@ -53,8 +53,6 @@ public class Base64Decoder extends MessageToMessageDecoder { } public Base64Decoder(Base64Dialect dialect) { - super(ByteBuf.class); - if (dialect == null) { throw new NullPointerException("dialect"); } diff --git a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java index 19a3f933c3..145de1ca1a 100644 --- a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java @@ -48,10 +48,6 @@ import io.netty.handler.codec.MessageToMessageDecoder; */ public class ByteArrayDecoder extends MessageToMessageDecoder { - public ByteArrayDecoder() { - super(ByteBuf.class); - } - @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { byte[] array; diff --git a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java index 5c048ef1c9..4af92e8636 100644 --- a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java @@ -15,6 +15,9 @@ */ package io.netty.handler.codec.protobuf; +import com.google.protobuf.ExtensionRegistry; +import com.google.protobuf.Message; +import com.google.protobuf.MessageLite; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.channel.ChannelHandler.Sharable; @@ -25,10 +28,6 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.MessageToMessageDecoder; -import com.google.protobuf.ExtensionRegistry; -import com.google.protobuf.Message; -import com.google.protobuf.MessageLite; - /** * Decodes a received {@link ByteBuf} into a * Google Protocol Buffers @@ -74,8 +73,6 @@ public class ProtobufDecoder extends MessageToMessageDecoder { } public ProtobufDecoder(MessageLite prototype, ExtensionRegistry extensionRegistry) { - super(ByteBuf.class); - if (prototype == null) { throw new NullPointerException("prototype"); } diff --git a/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java b/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java index bae70d756c..6bb0f33925 100644 --- a/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java @@ -68,8 +68,6 @@ public class StringDecoder extends MessageToMessageDecoder { * Creates a new instance with the specified character set. */ public StringDecoder(Charset charset) { - super(ByteBuf.class); - if (charset == null) { throw new NullPointerException("charset"); } diff --git a/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayDecoderTest.java index 2faca34bf4..56a2c53c68 100644 --- a/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayDecoderTest.java @@ -15,18 +15,19 @@ */ package io.netty.handler.codec.bytes; -import static io.netty.buffer.Unpooled.*; -import static org.hamcrest.core.Is.*; -import static org.junit.Assert.*; import io.netty.channel.embedded.EmbeddedMessageChannel; - -import java.util.Random; - import org.junit.Before; import org.junit.Test; +import java.util.Random; + +import static io.netty.buffer.Unpooled.*; +import static org.hamcrest.core.Is.*; +import static org.junit.Assert.*; + /** */ +@SuppressWarnings("ZeroLengthArrayAllocation") public class ByteArrayDecoderTest { private EmbeddedMessageChannel ch; diff --git a/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpInboundByteStreamHandler.java b/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpInboundByteStreamHandler.java index 057ac4f12d..a770988f92 100644 --- a/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpInboundByteStreamHandler.java +++ b/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpInboundByteStreamHandler.java @@ -40,8 +40,8 @@ public class SctpInboundByteStreamHandler extends ChannelInboundMessageHandlerAd } @Override - public boolean isSupported(Object msg) throws Exception { - if (super.isSupported(msg)) { + public boolean acceptInboundMessage(Object msg) throws Exception { + if (super.acceptInboundMessage(msg)) { return isDecodable((SctpMessage) msg); } return false; diff --git a/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpMessageToMessageDecoder.java b/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpMessageToMessageDecoder.java index 43689374e9..fc465b72d1 100644 --- a/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpMessageToMessageDecoder.java +++ b/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpMessageToMessageDecoder.java @@ -23,7 +23,7 @@ import io.netty.handler.codec.MessageToMessageDecoder; public abstract class SctpMessageToMessageDecoder extends MessageToMessageDecoder { @Override - public boolean isDecodable(Object msg) throws Exception { + public boolean acceptInboundMessage(Object msg) throws Exception { if (msg instanceof SctpMessage) { SctpMessage sctpMsg = (SctpMessage) msg; if (sctpMsg.isComplete()) { diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java index 7bfd25cd57..6f5f1f9cb9 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java @@ -18,7 +18,8 @@ package io.netty.channel; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; -import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -50,34 +51,36 @@ public abstract class ChannelInboundMessageHandlerAdapter private static final ConcurrentMap, Class> messageTypeMap = new ConcurrentHashMap, Class>(); - private final Class acceptedMsgType = findMessageType(); + private final Class acceptedMsgType; - private Class findMessageType() { - Class thisClass = getClass(); + protected ChannelInboundMessageHandlerAdapter() { + this(ChannelInboundMessageHandlerAdapter.class, 0); + } + + protected ChannelInboundMessageHandlerAdapter( + @SuppressWarnings("rawtypes") + Class parameterizedHandlerType, + int messageTypeParamIndex) { + acceptedMsgType = findMessageType(parameterizedHandlerType, messageTypeParamIndex); + } + + private Class findMessageType(Class parameterizedHandlerType, int messageTypeParamIndex) { + final Class thisClass = getClass(); Class messageType = messageTypeMap.get(thisClass); if (messageType == null) { - for (Method m: getClass().getDeclaredMethods()) { - if (!"messageReceived".equals(m.getName())) { - continue; - } - if (m.isSynthetic() || m.isBridge()) { - continue; - } - Class[] p = m.getParameterTypes(); - if (p.length != 2) { - continue; - } - if (p[0] != ChannelHandlerContext.class) { - continue; - } + Class currentClass = thisClass; + for (;;) { + if (currentClass.getSuperclass() == parameterizedHandlerType) { + Type[] types = ((ParameterizedType) currentClass.getGenericSuperclass()).getActualTypeArguments(); + if (types.length - 1 < messageTypeParamIndex || !(types[0] instanceof Class)) { + throw new IllegalStateException( + "cannot determine the inbound message type of " + thisClass.getSimpleName()); + } - messageType = p[1]; - break; - } - - if (messageType == null) { - throw new IllegalStateException( - "cannot determine the inbound message type of " + thisClass.getSimpleName()); + messageType = (Class) types[0]; + break; + } + currentClass = currentClass.getSuperclass(); } messageTypeMap.put(thisClass, messageType); @@ -114,7 +117,7 @@ public abstract class ChannelInboundMessageHandlerAdapter } try { - if (!isSupported(msg)) { + if (!acceptInboundMessage(msg)) { out.add(msg); unsupportedFound = true; continue; @@ -153,7 +156,7 @@ public abstract class ChannelInboundMessageHandlerAdapter * * @param msg the message */ - public boolean isSupported(Object msg) throws Exception { + public boolean acceptInboundMessage(Object msg) throws Exception { return acceptedMsgType.isInstance(msg); } diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 540916a506..27a0606a77 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -156,9 +156,9 @@ public class DefaultChannelPipelineTest { boolean called; @Override - public boolean isSupported(Object msg) throws Exception { + public boolean acceptInboundMessage(Object msg) throws Exception { called = true; - return super.isSupported(msg); + return super.acceptInboundMessage(msg); } @Override