From 76eb40a4d2c8a651e6b58cdebd3773f84e79b754 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 8 Feb 2013 17:07:01 +0900 Subject: [PATCH] Make ChannelOutboundMessageHandlerAdapter similar to ChannelInboundMessageHandlerAdapter --- .../handler/codec/http/HttpObjectEncoder.java | 7 -- .../codec/http/HttpRequestEncoder.java | 4 +- .../codec/http/HttpResponseEncoder.java | 4 +- .../websocketx/WebSocket00FrameEncoder.java | 8 +- .../websocketx/WebSocket08FrameEncoder.java | 4 +- .../handler/codec/rtsp/RtspObjectEncoder.java | 2 +- .../codec/rtsp/RtspRequestEncoder.java | 2 +- .../codec/rtsp/RtspResponseEncoder.java | 2 +- .../handler/codec/spdy/SpdyFrameEncoder.java | 7 +- .../handler/codec/spdy/SpdyHttpEncoder.java | 2 - .../WebSocketServerProtocolHandlerTest.java | 18 +++- .../codec/socks/SocksMessageEncoder.java | 6 +- .../handler/codec/ByteToMessageCodec.java | 62 +++++++------ .../handler/codec/LengthFieldPrepender.java | 5 +- .../handler/codec/MessageToByteEncoder.java | 59 ++++-------- .../handler/codec/MessageToMessageCodec.java | 2 +- .../codec/MessageToMessageEncoder.java | 93 ++++--------------- .../handler/codec/base64/Base64Encoder.java | 2 - .../handler/codec/bytes/ByteArrayEncoder.java | 49 +++------- .../codec/protobuf/ProtobufEncoder.java | 15 ++- .../ProtobufVarint32LengthFieldPrepender.java | 7 -- .../CompatibleObjectEncoder.java | 6 +- .../codec/serialization/ObjectEncoder.java | 8 +- .../handler/codec/string/StringEncoder.java | 45 +++------ .../example/factorial/NumberEncoder.java | 7 +- .../ChannelOutboundMessageHandlerAdapter.java | 83 +++++++++++++++++ 26 files changed, 219 insertions(+), 290 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectEncoder.java index ef69147126..87cff6d88a 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectEncoder.java @@ -48,13 +48,6 @@ public abstract class HttpObjectEncoder extends MessageTo @SuppressWarnings("RedundantFieldInitialization") private int state = ST_INIT; - /** - * Creates a new instance. - */ - protected HttpObjectEncoder() { - super(HttpObject.class); - } - @Override protected void encode(ChannelHandlerContext ctx, HttpObject msg, ByteBuf out) throws Exception { if (msg instanceof HttpMessage) { diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestEncoder.java index 332f147379..4e7e2bf0a6 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestEncoder.java @@ -28,8 +28,8 @@ public class HttpRequestEncoder extends HttpObjectEncoder { private static final char SLASH = '/'; @Override - public boolean isEncodable(Object msg) throws Exception { - return super.isEncodable(msg) && !(msg instanceof HttpResponse); + public boolean acceptOutboundMessage(Object msg) throws Exception { + return super.acceptOutboundMessage(msg) && !(msg instanceof HttpResponse); } @Override diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseEncoder.java index d433d44a3d..9d953900d0 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseEncoder.java @@ -27,8 +27,8 @@ import static io.netty.handler.codec.http.HttpConstants.*; public class HttpResponseEncoder extends HttpObjectEncoder { @Override - public boolean isEncodable(Object msg) throws Exception { - return super.isEncodable(msg) && !(msg instanceof HttpRequest); + public boolean acceptOutboundMessage(Object msg) throws Exception { + return super.acceptOutboundMessage(msg) && !(msg instanceof HttpRequest); } @Override diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java index 033f1dbae5..cb62ebd5a9 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java @@ -32,14 +32,8 @@ import io.netty.handler.codec.MessageToByteEncoder; @Sharable public class WebSocket00FrameEncoder extends MessageToByteEncoder { - public WebSocket00FrameEncoder() { - super(WebSocketFrame.class); - } - @Override - public void encode( - ChannelHandlerContext ctx, - WebSocketFrame msg, ByteBuf out) throws Exception { + public void encode(ChannelHandlerContext ctx, WebSocketFrame msg, ByteBuf out) throws Exception { if (msg instanceof TextWebSocketFrame) { // Text frame ByteBuf data = msg.data(); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameEncoder.java index 1a5a025aba..75518e15ea 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameEncoder.java @@ -90,13 +90,11 @@ public class WebSocket08FrameEncoder extends MessageToByteEncoder extends HttpObjec } @Override - public boolean isEncodable(Object msg) throws Exception { + public boolean acceptOutboundMessage(Object msg) throws Exception { return msg instanceof FullHttpMessage; } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspRequestEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspRequestEncoder.java index 653cd48378..fb5cf936a8 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspRequestEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspRequestEncoder.java @@ -28,7 +28,7 @@ import io.netty.util.CharsetUtil; public class RtspRequestEncoder extends RtspObjectEncoder { @Override - public boolean isEncodable(Object msg) throws Exception { + public boolean acceptOutboundMessage(Object msg) throws Exception { return msg instanceof FullHttpRequest; } diff --git a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspResponseEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspResponseEncoder.java index f654e392a4..34724f0f4f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspResponseEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspResponseEncoder.java @@ -28,7 +28,7 @@ import io.netty.util.CharsetUtil; public class RtspResponseEncoder extends RtspObjectEncoder { @Override - public boolean isEncodable(Object msg) throws Exception { + public boolean acceptOutboundMessage(Object msg) throws Exception { return msg instanceof FullHttpResponse; } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java index 344ff694a2..fb2691a9c1 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java @@ -50,8 +50,6 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { * Creates a new instance with the specified parameters. */ public SpdyFrameEncoder(int version, int compressionLevel, int windowBits, int memLevel) { - super(SpdyDataFrame.class, SpdyControlFrame.class); - if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { throw new IllegalArgumentException( "unknown version: " + version); @@ -61,6 +59,11 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { version, compressionLevel, windowBits, memLevel); } + @Override + public boolean acceptOutboundMessage(Object msg) throws Exception { + return msg instanceof SpdyDataFrame || msg instanceof SpdyControlFrame; + } + @Override public void beforeAdd(ChannelHandlerContext ctx) throws Exception { ctx.channel().closeFuture().addListener(new ChannelFutureListener() { diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java index 5fdc719f91..ea3d9a3cc7 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java @@ -131,8 +131,6 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder { * @param version the protocol version */ public SpdyHttpEncoder(int version) { - super(HttpObject.class); - if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { throw new IllegalArgumentException( "unsupported version: " + version); diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java index b0d6c1d0ef..35a1b06869 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java @@ -16,10 +16,12 @@ package io.netty.handler.codec.http.websocketx; import io.netty.buffer.MessageBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; -import io.netty.channel.ChannelOutboundMessageHandlerAdapter; +import io.netty.channel.ChannelOperationHandlerAdapter; +import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedMessageChannel; import io.netty.handler.codec.http.DefaultHttpRequest; @@ -138,7 +140,19 @@ public class WebSocketServerProtocolHandlerTest { return (FullHttpResponse) outbound.poll(); } - private static class MockOutboundHandler extends ChannelOutboundMessageHandlerAdapter { + private static class MockOutboundHandler + extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler { + + @Override + public MessageBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + return Unpooled.messageBuffer(); + } + + @Override + public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + ctx.outboundMessageBuffer().free(); + } + @Override public void flush(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { //NoOp diff --git a/codec-socks/src/main/java/io/netty/handler/codec/socks/SocksMessageEncoder.java b/codec-socks/src/main/java/io/netty/handler/codec/socks/SocksMessageEncoder.java index d17eec2563..b71b965fa7 100644 --- a/codec-socks/src/main/java/io/netty/handler/codec/socks/SocksMessageEncoder.java +++ b/codec-socks/src/main/java/io/netty/handler/codec/socks/SocksMessageEncoder.java @@ -34,12 +34,8 @@ public class SocksMessageEncoder extends MessageToByteEncoder { return name; } - public SocksMessageEncoder() { - super(SocksMessage.class); - } - @Override - public void encode(ChannelHandlerContext ctx, SocksMessage msg, ByteBuf out) throws Exception { + protected void encode(ChannelHandlerContext ctx, SocksMessage msg, ByteBuf out) throws Exception { msg.encodeAsByteBuf(out); } } diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java index 53176f5904..ffa62e6ad3 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java @@ -19,43 +19,49 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelPromise; +import io.netty.util.internal.TypeParameterFinder; public abstract class ByteToMessageCodec extends ChannelDuplexHandler implements ChannelInboundByteHandler, ChannelOutboundMessageHandler { - private final Class[] encodableMessageTypes; - private final MessageToByteEncoder encoder; - private final ByteToMessageDecoder decoder; + private final Class acceptedOutboundMsgType; + private final MessageToByteEncoder encoder = new MessageToByteEncoder() { + @Override + public boolean acceptOutboundMessage(Object msg) throws Exception { + return ByteToMessageCodec.this.acceptOutboundMessage(msg); + } - protected ByteToMessageCodec(Class... encodableMessageTypes) { - this.encodableMessageTypes = ChannelHandlerUtil.acceptedMessageTypes(encodableMessageTypes); - encoder = new MessageToByteEncoder() { - @Override - public boolean isEncodable(Object msg) throws Exception { - return ByteToMessageCodec.this.isEncodable(msg); - } + @Override + protected void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception { + ByteToMessageCodec.this.encode(ctx, msg, out); + } + }; - @Override - protected void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception { - ByteToMessageCodec.this.encode(ctx, msg, out); - } - }; + private final ByteToMessageDecoder decoder = new ByteToMessageDecoder() { + @Override + public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + return ByteToMessageCodec.this.decode(ctx, in); + } - decoder = new ByteToMessageDecoder() { - @Override - public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - return ByteToMessageCodec.this.decode(ctx, in); - } + @Override + protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + return ByteToMessageCodec.this.decodeLast(ctx, in); + } + }; - @Override - protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - return ByteToMessageCodec.this.decodeLast(ctx, in); - } - }; + protected ByteToMessageCodec() { + this(ByteToMessageCodec.class, 0); + } + + protected ByteToMessageCodec( + @SuppressWarnings("rawtypes") + Class parameterizedHandlerType, + int messageTypeParamIndex) { + acceptedOutboundMsgType = + TypeParameterFinder.findActualTypeParameter(this, parameterizedHandlerType, messageTypeParamIndex); } @Override @@ -98,8 +104,8 @@ public abstract class ByteToMessageCodec extends ChannelDuplexHandler encoder.flush(ctx, promise); } - public boolean isEncodable(Object msg) throws Exception { - return ChannelHandlerUtil.acceptMessage(encodableMessageTypes, msg); + public boolean acceptOutboundMessage(Object msg) throws Exception { + return acceptedOutboundMsgType.isInstance(msg); } protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; diff --git a/codec/src/main/java/io/netty/handler/codec/LengthFieldPrepender.java b/codec/src/main/java/io/netty/handler/codec/LengthFieldPrepender.java index 17165aa50e..9094522106 100644 --- a/codec/src/main/java/io/netty/handler/codec/LengthFieldPrepender.java +++ b/codec/src/main/java/io/netty/handler/codec/LengthFieldPrepender.java @@ -78,10 +78,7 @@ public class LengthFieldPrepender extends MessageToByteEncoder { * @throws IllegalArgumentException * if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8 */ - public LengthFieldPrepender( - int lengthFieldLength, boolean lengthIncludesLengthFieldLength) { - super(ByteBuf.class); - + public LengthFieldPrepender(int lengthFieldLength, boolean lengthIncludesLengthFieldLength) { if (lengthFieldLength != 1 && lengthFieldLength != 2 && lengthFieldLength != 3 && lengthFieldLength != 4 && lengthFieldLength != 8) { diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java index fc1b545bd2..5df3615972 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java @@ -21,8 +21,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelHandlerUtil; -import io.netty.channel.ChannelPromise; /** @@ -44,55 +42,30 @@ import io.netty.channel.ChannelPromise; */ public abstract class MessageToByteEncoder extends ChannelOutboundMessageHandlerAdapter { - private final Class[] acceptedMsgTypes; - /** * The types which will be accepted by the encoder. If a received message is an other type it will be just forwared * to the next {@link ChannelOutboundMessageHandler} in the {@link ChannelPipeline} */ - protected MessageToByteEncoder(Class... acceptedMsgTypes) { - this.acceptedMsgTypes = ChannelHandlerUtil.acceptedMessageTypes(acceptedMsgTypes); + protected MessageToByteEncoder() { + this(MessageToByteEncoder.class, 0); + } + + protected MessageToByteEncoder( + @SuppressWarnings("rawtypes") + Class parameterizedHandlerType, + int messageTypeParamIndex) { + super(parameterizedHandlerType, messageTypeParamIndex); } @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - MessageBuf in = ctx.outboundMessageBuffer(); - ByteBuf out = ctx.nextOutboundByteBuffer(); - - for (;;) { - Object msg = in.poll(); - if (msg == null) { - break; - } - - if (!isEncodable(msg)) { - ChannelHandlerUtil.addToNextOutboundBuffer(ctx, msg); - continue; - } - - @SuppressWarnings("unchecked") - I imsg = (I) msg; - try { - encode(ctx, imsg, out); - } catch (Throwable t) { - if (t instanceof CodecException) { - ctx.fireExceptionCaught(t); - } else { - ctx.fireExceptionCaught(new EncoderException(t)); - } - } + protected void flush(ChannelHandlerContext ctx, I msg) throws Exception { + try { + encode(ctx, msg, ctx.nextOutboundByteBuffer()); + } catch (CodecException e) { + throw e; + } catch (Exception e) { + throw new CodecException(e); } - - ctx.flush(promise); - } - - /** - * Returns {@code true} if and only if the specified message can be encoded by this encoder. - * - * @param msg the message - */ - public boolean isEncodable(Object msg) throws Exception { - return ChannelHandlerUtil.acceptMessage(acceptedMsgTypes, msg); } /** diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java index 8edc103491..b20838dfeb 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java @@ -59,7 +59,7 @@ public abstract class MessageToMessageCodec private final MessageToMessageEncoder encoder = new MessageToMessageEncoder() { @Override - public boolean isEncodable(Object msg) throws Exception { + public boolean acceptOutboundMessage(Object msg) throws Exception { return MessageToMessageCodec.this.acceptOutboundMessage(msg); } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index 932aa0d62f..8277de3334 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -17,12 +17,9 @@ package io.netty.handler.codec; import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; -import io.netty.channel.PartialFlushException; /** * {@link ChannelOutboundMessageHandlerAdapter} which encodes from one message to an other message @@ -47,77 +44,32 @@ import io.netty.channel.PartialFlushException; */ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageHandlerAdapter { - private final Class[] acceptedMsgTypes; - /** * The types which will be accepted by the decoder. If a received message is an other type it will be just forwared * to the next {@link ChannelOutboundMessageHandler} in the {@link ChannelPipeline} */ - protected MessageToMessageEncoder(Class... acceptedMsgTypes) { - this.acceptedMsgTypes = ChannelHandlerUtil.acceptedMessageTypes(acceptedMsgTypes); + protected MessageToMessageEncoder() { + super(MessageToMessageEncoder.class, 0); + } + + protected MessageToMessageEncoder( + @SuppressWarnings("rawtypes") + Class parameterizedHandlerType, + int messageTypeParamIndex) { + + super(parameterizedHandlerType, messageTypeParamIndex); } @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - MessageBuf in = ctx.outboundMessageBuffer(); - boolean encoded = false; - - for (;;) { - try { - Object msg = in.poll(); - if (msg == null) { - break; - } - - if (!isEncodable(msg)) { - ChannelHandlerUtil.addToNextOutboundBuffer(ctx, msg); - continue; - } - - @SuppressWarnings("unchecked") - I imsg = (I) msg; - boolean free = true; - try { - Object omsg = encode(ctx, imsg); - if (omsg == null) { - // encode() might be waiting for more inbound messages to generate - // an aggregated message - keep polling. - continue; - } - if (omsg == imsg) { - free = false; - } - encoded = true; - ChannelHandlerUtil.unfoldAndAdd(ctx, omsg, false); - } finally { - if (free) { - freeOutboundMessage(imsg); - } - } - } catch (Throwable t) { - Throwable cause; - if (t instanceof CodecException) { - cause = t; - } else { - cause = new EncoderException(t); - } - if (encoded) { - cause = new PartialFlushException("Unable to encoded all messages", cause); - } - promise.setFailure(cause); - return; - } + protected final void flush(ChannelHandlerContext ctx, I msg) throws Exception { + try { + Object encoded = encode(ctx, msg); + ctx.nextOutboundMessageBuffer().add(encoded); + } catch (CodecException e) { + throw e; + } catch (Exception e) { + throw new CodecException(e); } - ctx.flush(promise); - } - - /** - * Returns {@code true} if and only if the specified message can be encoded by this encoder. - * - * @param msg the message - */ - public boolean isEncodable(Object msg) throws Exception { - return ChannelHandlerUtil.acceptMessage(acceptedMsgTypes, msg); } /** @@ -131,13 +83,4 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageH * @throws Exception is thrown if an error accour */ protected abstract Object encode(ChannelHandlerContext ctx, I msg) throws Exception; - - /** - * Is called after a message was processed via {@link #encode(ChannelHandlerContext, Object)} to free - * up any resources that is held by the inbound message. You may want to override this if your implementation - * just pass-through the input message or need it for later usage. - */ - protected void freeOutboundMessage(I msg) throws Exception { - ChannelHandlerUtil.freeMessage(msg); - } } diff --git a/codec/src/main/java/io/netty/handler/codec/base64/Base64Encoder.java b/codec/src/main/java/io/netty/handler/codec/base64/Base64Encoder.java index f713d912a0..6932f7aceb 100644 --- a/codec/src/main/java/io/netty/handler/codec/base64/Base64Encoder.java +++ b/codec/src/main/java/io/netty/handler/codec/base64/Base64Encoder.java @@ -54,8 +54,6 @@ public class Base64Encoder extends MessageToMessageEncoder { } public Base64Encoder(boolean breakLines, Base64Dialect dialect) { - super(ByteBuf.class); - if (dialect == null) { throw new NullPointerException("dialect"); } diff --git a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayEncoder.java b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayEncoder.java index f4c3534645..1ed83f8b24 100644 --- a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayEncoder.java @@ -17,12 +17,10 @@ package io.netty.handler.codec.bytes; import io.netty.buffer.BufType; import io.netty.buffer.ByteBuf; -import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; @@ -62,41 +60,20 @@ public class ByteArrayEncoder extends ChannelOutboundMessageHandlerAdapter in = ctx.outboundMessageBuffer(); - MessageBuf msgOut = ctx.nextOutboundMessageBuffer(); - ByteBuf byteOut = ctx.nextOutboundByteBuffer(); + protected void flush(ChannelHandlerContext ctx, byte[] msg) throws Exception { + if (msg.length == 0) { + return; + } - try { - for (;;) { - Object m = in.poll(); - if (m == null) { - break; - } - - if (!(m instanceof byte[])) { - msgOut.add(m); - continue; - } - - byte[] a = (byte[]) m; - if (a.length == 0) { - continue; - } - - switch (nextBufferType) { - case BYTE: - byteOut.writeBytes(a); - break; - case MESSAGE: - msgOut.add(Unpooled.wrappedBuffer(a)); - break; - default: - throw new Error(); - } - } - } finally { - ctx.flush(promise); + switch (nextBufferType) { + case BYTE: + ctx.nextOutboundByteBuffer().writeBytes(msg); + break; + case MESSAGE: + ctx.nextOutboundMessageBuffer().add(Unpooled.wrappedBuffer(msg)); + break; + default: + throw new Error(); } } } diff --git a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufEncoder.java b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufEncoder.java index 7cda58290b..6d943a8146 100644 --- a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufEncoder.java @@ -15,7 +15,9 @@ */ package io.netty.handler.codec.protobuf; -import static io.netty.buffer.Unpooled.*; +import com.google.protobuf.Message; +import com.google.protobuf.MessageLite; +import com.google.protobuf.MessageLiteOrBuilder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -24,8 +26,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.MessageToMessageEncoder; -import com.google.protobuf.Message; -import com.google.protobuf.MessageLite; +import static io.netty.buffer.Unpooled.*; /** * Encodes the requested Google @@ -56,14 +57,10 @@ import com.google.protobuf.MessageLite; * @apiviz.landmark */ @Sharable -public class ProtobufEncoder extends MessageToMessageEncoder { - - public ProtobufEncoder() { - super(MessageLite.class, MessageLite.Builder.class); - } +public class ProtobufEncoder extends MessageToMessageEncoder { @Override - protected Object encode(ChannelHandlerContext ctx, Object msg) throws Exception { + protected Object encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg) throws Exception { if (msg instanceof MessageLite) { return wrappedBuffer(((MessageLite) msg).toByteArray()); } diff --git a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java index b150d00d0a..b9231781d8 100644 --- a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java +++ b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java @@ -39,13 +39,6 @@ import io.netty.handler.codec.MessageToByteEncoder; @Sharable public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder { - /** - * Creates a new instance. - */ - public ProtobufVarint32LengthFieldPrepender() { - super(ByteBuf.class); - } - @Override protected void encode( ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { diff --git a/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectEncoder.java b/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectEncoder.java index 64400bfa96..92a8886460 100644 --- a/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectEncoder.java @@ -34,7 +34,7 @@ import java.io.Serializable; * This encoder is interoperable with the standard Java object streams such as * {@link ObjectInputStream} and {@link ObjectOutputStream}. */ -public class CompatibleObjectEncoder extends MessageToByteEncoder { +public class CompatibleObjectEncoder extends MessageToByteEncoder { private static final AttributeKey OOS = new AttributeKey(CompatibleObjectEncoder.class.getName() + ".oos"); @@ -59,8 +59,6 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder { * the long term. */ public CompatibleObjectEncoder(int resetInterval) { - super(Serializable.class); - if (resetInterval < 0) { throw new IllegalArgumentException( "resetInterval: " + resetInterval); @@ -78,7 +76,7 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder { } @Override - protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { + protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception { Attribute oosAttr = ctx.attr(OOS); ObjectOutputStream oos = oosAttr.get(); if (oos == null) { diff --git a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java index 463db27c7d..766186f69b 100644 --- a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java @@ -36,15 +36,11 @@ import java.io.Serializable; * @apiviz.has io.netty.handler.codec.serialization.ObjectEncoderOutputStream - - - compatible with */ @Sharable -public class ObjectEncoder extends MessageToByteEncoder { +public class ObjectEncoder extends MessageToByteEncoder { private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; - public ObjectEncoder() { - super(Serializable.class); - } - @Override - protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { + protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception { int startIdx = out.writerIndex(); ByteBufOutputStream bout = new ByteBufOutputStream(out); diff --git a/codec/src/main/java/io/netty/handler/codec/string/StringEncoder.java b/codec/src/main/java/io/netty/handler/codec/string/StringEncoder.java index a02dac8993..81bb50b3ba 100644 --- a/codec/src/main/java/io/netty/handler/codec/string/StringEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/string/StringEncoder.java @@ -17,13 +17,11 @@ package io.netty.handler.codec.string; import io.netty.buffer.BufType; import io.netty.buffer.ByteBuf; -import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; import io.netty.handler.codec.LineBasedFrameDecoder; import java.nio.charset.Charset; @@ -79,39 +77,18 @@ public class StringEncoder extends ChannelOutboundMessageHandlerAdapter in = ctx.outboundMessageBuffer(); - MessageBuf msgOut = ctx.nextOutboundMessageBuffer(); - ByteBuf byteOut = ctx.nextOutboundByteBuffer(); + protected void flush(ChannelHandlerContext ctx, CharSequence msg) throws Exception { + ByteBuf encoded = Unpooled.copiedBuffer(msg, charset); - try { - for (;;) { - Object m = in.poll(); - if (m == null) { - break; - } - - if (!(m instanceof CharSequence)) { - msgOut.add(m); - continue; - } - - CharSequence s = (CharSequence) m; - ByteBuf encoded = Unpooled.copiedBuffer(s, charset); - - switch (nextBufferType) { - case BYTE: - byteOut.writeBytes(encoded); - break; - case MESSAGE: - msgOut.add(encoded); - break; - default: - throw new Error(); - } - } - } finally { - ctx.flush(promise); + switch (nextBufferType) { + case BYTE: + ctx.nextOutboundByteBuffer().writeBytes(encoded); + break; + case MESSAGE: + ctx.nextOutboundMessageBuffer().add(encoded); + break; + default: + throw new Error(); } } } diff --git a/example/src/main/java/io/netty/example/factorial/NumberEncoder.java b/example/src/main/java/io/netty/example/factorial/NumberEncoder.java index ebe79f3e9e..7342a26fd3 100644 --- a/example/src/main/java/io/netty/example/factorial/NumberEncoder.java +++ b/example/src/main/java/io/netty/example/factorial/NumberEncoder.java @@ -28,13 +28,8 @@ import java.math.BigInteger; */ public class NumberEncoder extends MessageToByteEncoder { - public NumberEncoder() { - super(Number.class); - } - @Override - public void encode( - ChannelHandlerContext ctx, Number msg, ByteBuf out) throws Exception { + public void encode(ChannelHandlerContext ctx, Number msg, ByteBuf out) throws Exception { // Convert to a BigInteger first for easier implementation. BigInteger v; if (msg instanceof BigInteger) { diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java index 323ac74bc9..bf6ac1971e 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java @@ -17,6 +17,7 @@ package io.netty.channel; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; +import io.netty.util.internal.TypeParameterFinder; /** * Abstract base class which handles messages of a specific type. @@ -26,6 +27,20 @@ import io.netty.buffer.Unpooled; public abstract class ChannelOutboundMessageHandlerAdapter extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler { + private final Class acceptedMsgType; + + protected ChannelOutboundMessageHandlerAdapter() { + this(ChannelOutboundMessageHandlerAdapter.class, 0); + } + + protected ChannelOutboundMessageHandlerAdapter( + @SuppressWarnings("rawtypes") + Class parameterizedHandlerType, + int messageTypeParamIndex) { + acceptedMsgType = TypeParameterFinder.findActualTypeParameter( + this, parameterizedHandlerType, messageTypeParamIndex); + } + @Override public MessageBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { return Unpooled.messageBuffer(); @@ -35,4 +50,72 @@ public abstract class ChannelOutboundMessageHandlerAdapter public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { ctx.outboundMessageBuffer().free(); } + + /** + * Returns {@code true} if and only if the specified message can be handled by this handler. + * + * @param msg the message + */ + public boolean acceptOutboundMessage(Object msg) throws Exception { + return acceptedMsgType.isInstance(msg); + } + + @Override + public final void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + MessageBuf in = ctx.outboundMessageBuffer(); + MessageBuf out = null; + ChannelPromise nextPromise = promise; + try { + for (;;) { + Object msg = in.poll(); + if (msg == null) { + break; + } + + try { + if (!acceptOutboundMessage(msg)) { + if (out == null) { + out = ctx.nextOutboundMessageBuffer(); + } + out.add(msg); + continue; + } + + @SuppressWarnings("unchecked") + I imsg = (I) msg; + try { + flush(ctx, imsg); + } finally { + freeOutboundMessage(imsg); + } + } catch (Throwable t) { + if (!promise.isDone()) { + promise.setFailure(new PartialFlushException( + "faied to encode all messages associated with the future", t)); + nextPromise = ctx.newPromise(); + } + } + } + } finally { + ctx.flush(nextPromise); + } + } + + /** + * Is called once a message is being flushed. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to + * @param msg the message to handle + * @throws Exception thrown when an error accour + */ + protected abstract void flush(ChannelHandlerContext ctx, I msg) throws Exception; + + /** + * Is called after a message was processed via {@link #flush(ChannelHandlerContext, Object)} to free + * up any resources that is held by the outbound message. You may want to override this if your implementation + * just pass-through the input message or need it for later usage. + */ + protected void freeOutboundMessage(I msg) throws Exception { + ChannelHandlerUtil.freeMessage(msg); + } }