From d4742bbe16bd4864bf824d5946de16382de72a73 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 6 Feb 2013 12:55:42 +0900 Subject: [PATCH] Clean up abstract ChannelHandler impls / Remove ChannelHandlerContext.hasNext*() - Rename ChannelHandlerAdapter to ChannelDuplexHandler - Add ChannelHandlerAdapter that implements only ChannelHandler - Rename CombinedChannelHandler to CombinedChannelDuplexHandler and improve runtime validation - Remove ChannelInbound/OutboundHandlerAdapter which are not useful - Make ChannelOutboundByteHandlerAdapter similar to ChannelInboundByteHandlerAdapter - Make the tail and head handler of DefaultChannelPipeline accept both bytes and messages. ChannelHandlerContext.hasNext*() were removed because they always return true now. - Removed various unnecessary null checks. - Correct method/field names: inboundBufferSuspended -> channelReadSuspended --- .../netty/buffer/QueueBackedMessageBuf.java | 3 + .../handler/codec/http/HttpClientCodec.java | 59 +- .../handler/codec/http/HttpObjectEncoder.java | 10 +- .../handler/codec/http/HttpServerCodec.java | 51 +- .../handler/codec/http/package-info.java | 1 - .../handler/codec/spdy/SpdyFrameCodec.java | 44 +- .../handler/codec/spdy/SpdyHttpCodec.java | 43 +- .../handler/codec/spdy/SpdyHttpEncoder.java | 6 +- .../handler/codec/spdy/SpdyOrHttpChooser.java | 4 +- .../codec/spdy/SpdySessionHandler.java | 4 +- .../netty/handler/codec/ByteToByteCodec.java | 4 +- .../handler/codec/ByteToByteEncoder.java | 3 +- .../handler/codec/ByteToMessageCodec.java | 4 +- .../handler/codec/MessageToMessageCodec.java | 4 +- .../codec/MessageToMessageDecoder.java | 3 +- .../netty/handler/logging/LoggingHandler.java | 4 +- .../java/io/netty/handler/ssl/SslHandler.java | 4 +- .../handler/stream/ChunkedWriteHandler.java | 4 +- .../handler/timeout/IdleStateHandler.java | 4 +- .../handler/timeout/ReadTimeoutHandler.java | 4 +- .../handler/timeout/WriteTimeoutHandler.java | 4 +- .../AbstractTrafficShapingHandler.java | 4 +- .../sctp/SctpOutboundByteStreamHandler.java | 4 +- .../io/netty/channel/AbstractChannel.java | 9 +- .../netty/channel/ChannelDuplexHandler.java | 124 +++++ .../netty/channel/ChannelHandlerAdapter.java | 125 ++--- .../netty/channel/ChannelHandlerContext.java | 40 +- .../io/netty/channel/ChannelHandlerUtil.java | 33 +- .../ChannelInboundByteHandlerAdapter.java | 4 +- .../channel/ChannelInboundHandlerAdapter.java | 43 -- .../netty/channel/ChannelInboundInvoker.java | 4 +- .../ChannelInboundMessageHandlerAdapter.java | 8 +- .../ChannelOperationHandlerAdapter.java | 58 +- .../ChannelOutboundByteHandlerAdapter.java | 19 +- .../ChannelOutboundHandlerAdapter.java | 43 -- .../netty/channel/ChannelOutboundInvoker.java | 2 +- .../ChannelOutboundMessageHandlerAdapter.java | 2 +- .../channel/ChannelStateHandlerAdapter.java | 70 +-- .../channel/CombinedChannelDuplexHandler.java | 244 +++++++++ .../netty/channel/CombinedChannelHandler.java | 225 -------- .../channel/DefaultChannelHandlerContext.java | 509 +++++++----------- .../netty/channel/DefaultChannelPipeline.java | 184 +++++-- .../io/netty/channel/local/LocalChannel.java | 4 +- .../channel/local/LocalServerChannel.java | 4 +- .../channel/nio/AbstractNioByteChannel.java | 10 +- .../nio/AbstractNioMessageChannel.java | 10 +- .../channel/oio/AbstractOioByteChannel.java | 4 +- .../oio/AbstractOioMessageChannel.java | 10 +- .../socket/aio/AioServerSocketChannel.java | 2 +- .../channel/socket/aio/AioSocketChannel.java | 10 +- .../netty/channel/AbstractEventLoopTest.java | 4 +- .../channel/DefaultChannelPipelineTest.java | 91 +++- .../local/LocalTransportThreadModelTest.java | 12 +- 53 files changed, 1121 insertions(+), 1061 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java delete mode 100644 transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java delete mode 100644 transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java create mode 100644 transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java delete mode 100644 transport/src/main/java/io/netty/channel/CombinedChannelHandler.java diff --git a/buffer/src/main/java/io/netty/buffer/QueueBackedMessageBuf.java b/buffer/src/main/java/io/netty/buffer/QueueBackedMessageBuf.java index a72c468d32..08dab0a4e6 100644 --- a/buffer/src/main/java/io/netty/buffer/QueueBackedMessageBuf.java +++ b/buffer/src/main/java/io/netty/buffer/QueueBackedMessageBuf.java @@ -33,6 +33,9 @@ final class QueueBackedMessageBuf extends AbstractMessageBuf { @Override public boolean offer(T e) { + if (e == null) { + throw new NullPointerException("e"); + } checkUnfreed(); return isWritable() && queue.offer(e); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java index 3f623481c7..2b8e1af0e9 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java @@ -16,9 +16,12 @@ package io.netty.handler.codec.http; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.CombinedChannelHandler; +import io.netty.channel.ChannelInboundByteHandler; +import io.netty.channel.ChannelOutboundMessageHandler; +import io.netty.channel.CombinedChannelDuplexHandler; import io.netty.handler.codec.PrematureChannelClosureException; import java.util.ArrayDeque; @@ -42,13 +45,15 @@ import java.util.concurrent.atomic.AtomicLong; * @apiviz.has io.netty.handler.codec.http.HttpResponseDecoder * @apiviz.has io.netty.handler.codec.http.HttpRequestEncoder */ -public class HttpClientCodec extends CombinedChannelHandler { +public final class HttpClientCodec + extends CombinedChannelDuplexHandler + implements ChannelInboundByteHandler, ChannelOutboundMessageHandler { /** A queue that is used for correlating a request and a response. */ - final Queue queue = new ArrayDeque(); + private final Queue queue = new ArrayDeque(); /** If true, decoding stops (i.e. pass-through) */ - volatile boolean done; + private volatile boolean done; private final AtomicLong requestResponseCounter = new AtomicLong(); private final boolean failOnMissingResponse; @@ -65,25 +70,53 @@ public class HttpClientCodec extends CombinedChannelHandler { /** * Creates a new instance with the specified decoder options. */ - public HttpClientCodec( - int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { + public HttpClientCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { this(maxInitialLineLength, maxHeaderSize, maxChunkSize, false); } public HttpClientCodec( - int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, - boolean failOnMissingResponse) { - - init( - new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize), - new Encoder()); + int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse) { + init(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize), new Encoder()); this.failOnMissingResponse = failOnMissingResponse; } + private Decoder decoder() { + return (Decoder) stateHandler(); + } + + private Encoder encoder() { + return (Encoder) operationHandler(); + } + + @Override + public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { + return decoder().newInboundBuffer(ctx); + } + + @Override + public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception { + decoder().discardInboundReadBytes(ctx); + } + + @Override + public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { + decoder().freeInboundBuffer(ctx); + } + + @Override + public MessageBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + return encoder().newOutboundBuffer(ctx); + } + + @Override + public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + encoder().freeOutboundBuffer(ctx); + } + private final class Encoder extends HttpRequestEncoder { @Override protected void encode( - ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { + ChannelHandlerContext ctx, HttpObject msg, ByteBuf out) throws Exception { if (msg instanceof HttpRequest && !done) { queue.offer(((HttpRequest) msg).getMethod()); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectEncoder.java index ec853d9cb1..ef69147126 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectEncoder.java @@ -39,7 +39,7 @@ import static io.netty.handler.codec.http.HttpConstants.*; * implement all abstract methods properly. * @apiviz.landmark */ -public abstract class HttpObjectEncoder extends MessageToByteEncoder { +public abstract class HttpObjectEncoder extends MessageToByteEncoder { private static final int ST_INIT = 0; private static final int ST_CONTENT_NON_CHUNK = 1; @@ -56,17 +56,17 @@ public abstract class HttpObjectEncoder extends MessageTo } @Override - @SuppressWarnings("unchecked") - protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { + protected void encode(ChannelHandlerContext ctx, HttpObject msg, ByteBuf out) throws Exception { if (msg instanceof HttpMessage) { if (state != ST_INIT) { throw new IllegalStateException("unexpected message type: " + msg.getClass().getSimpleName()); } - HttpMessage m = (HttpMessage) msg; + @SuppressWarnings({ "unchecked", "CastConflictsWithInstanceof" }) + H m = (H) msg; // Encode the message. - encodeInitialLine(out, (H) m); + encodeInitialLine(out, m); encodeHeaders(out, m); out.writeByte(CR); out.writeByte(LF); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java index c010644937..dfd5328759 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java @@ -15,7 +15,12 @@ */ package io.netty.handler.codec.http; -import io.netty.channel.CombinedChannelHandler; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundByteHandler; +import io.netty.channel.ChannelOutboundMessageHandler; +import io.netty.channel.CombinedChannelDuplexHandler; /** @@ -26,7 +31,9 @@ import io.netty.channel.CombinedChannelHandler; * @apiviz.has io.netty.handler.codec.http.HttpRequestDecoder * @apiviz.has io.netty.handler.codec.http.HttpResponseEncoder */ -public class HttpServerCodec extends CombinedChannelHandler { +public final class HttpServerCodec + extends CombinedChannelDuplexHandler + implements ChannelInboundByteHandler, ChannelOutboundMessageHandler { /** * Creates a new instance with the default decoder options @@ -40,10 +47,40 @@ public class HttpServerCodec extends CombinedChannelHandler { /** * Creates a new instance with the specified decoder options. */ - public HttpServerCodec( - int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { - super( - new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize), - new HttpResponseEncoder()); + public HttpServerCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { + super(new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize), new HttpResponseEncoder()); + } + + private HttpRequestDecoder decoder() { + return (HttpRequestDecoder) stateHandler(); + } + + private HttpResponseEncoder encoder() { + return (HttpResponseEncoder) operationHandler(); + } + + @Override + public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { + return decoder().newInboundBuffer(ctx); + } + + @Override + public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception { + decoder().discardInboundReadBytes(ctx); + } + + @Override + public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { + decoder().freeInboundBuffer(ctx); + } + + @Override + public MessageBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + return encoder().newOutboundBuffer(ctx); + } + + @Override + public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + encoder().freeOutboundBuffer(ctx); } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/package-info.java b/codec-http/src/main/java/io/netty/handler/codec/http/package-info.java index 9fbc130ccf..831b2b9a45 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/package-info.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/package-info.java @@ -18,7 +18,6 @@ * Encoder, decoder and their related message types for HTTP. * * @apiviz.exclude ^java\.lang\. - * @apiviz.exclude OneToOne(Encoder|Decoder)$ * @apiviz.exclude \.HttpHeaders\. * @apiviz.exclude \.codec\.replay\. * @apiviz.exclude \.(Simple)?Channel[A-Za-z]*Handler$ diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java index 28943b3625..813401eb2d 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java @@ -15,7 +15,12 @@ */ package io.netty.handler.codec.spdy; -import io.netty.channel.CombinedChannelHandler; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundByteHandler; +import io.netty.channel.ChannelOutboundMessageHandler; +import io.netty.channel.CombinedChannelDuplexHandler; /** @@ -24,7 +29,9 @@ import io.netty.channel.CombinedChannelHandler; * @apiviz.has io.netty.handler.codec.spdy.SpdyFrameDecoder * @apiviz.has io.netty.handler.codec.spdy.SpdyFrameEncoder */ -public class SpdyFrameCodec extends CombinedChannelHandler { +public final class SpdyFrameCodec + extends CombinedChannelDuplexHandler + implements ChannelInboundByteHandler, ChannelOutboundMessageHandler { /** * Creates a new instance with the specified {@code version} and @@ -47,4 +54,37 @@ public class SpdyFrameCodec extends CombinedChannelHandler { new SpdyFrameDecoder(version, maxChunkSize, maxHeaderSize), new SpdyFrameEncoder(version, compressionLevel, windowBits, memLevel)); } + + private SpdyFrameDecoder decoder() { + return (SpdyFrameDecoder) stateHandler(); + } + + private SpdyFrameEncoder encoder() { + return (SpdyFrameEncoder) operationHandler(); + } + + @Override + public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { + return decoder().newInboundBuffer(ctx); + } + + @Override + public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception { + decoder().discardInboundReadBytes(ctx); + } + + @Override + public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { + decoder().freeInboundBuffer(ctx); + } + + @Override + public MessageBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + return encoder().newOutboundBuffer(ctx); + } + + @Override + public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + encoder().freeOutboundBuffer(ctx); + } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java index 039e538578..f92a41e2b5 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java @@ -15,7 +15,12 @@ */ package io.netty.handler.codec.spdy; -import io.netty.channel.CombinedChannelHandler; +import io.netty.buffer.MessageBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandler; +import io.netty.channel.ChannelOutboundMessageHandler; +import io.netty.channel.CombinedChannelDuplexHandler; +import io.netty.handler.codec.http.HttpObject; /** @@ -23,14 +28,42 @@ import io.netty.channel.CombinedChannelHandler; * @apiviz.has io.netty.handler.codec.sdpy.SpdyHttpDecoder * @apiviz.has io.netty.handler.codec.spdy.SpdyHttpEncoder */ -public class SpdyHttpCodec extends CombinedChannelHandler { +public final class SpdyHttpCodec + extends CombinedChannelDuplexHandler + implements ChannelInboundMessageHandler, ChannelOutboundMessageHandler { /** * Creates a new instance with the specified decoder options. */ public SpdyHttpCodec(int version, int maxContentLength) { - super( - new SpdyHttpDecoder(version, maxContentLength), - new SpdyHttpEncoder(version)); + super(new SpdyHttpDecoder(version, maxContentLength), new SpdyHttpEncoder(version)); + } + + private SpdyHttpDecoder decoder() { + return (SpdyHttpDecoder) stateHandler(); + } + + private SpdyHttpEncoder encoder() { + return (SpdyHttpEncoder) operationHandler(); + } + + @Override + public MessageBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { + return decoder().newInboundBuffer(ctx); + } + + @Override + public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { + decoder().freeInboundBuffer(ctx); + } + + @Override + public MessageBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + return encoder().newOutboundBuffer(ctx); + } + + @Override + public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + encoder().freeOutboundBuffer(ctx); } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java index 59010a1309..5fdc719f91 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java @@ -120,7 +120,7 @@ import java.util.Map; * All pushed resources should be sent before sending the response * that corresponds to the initial request. */ -public class SpdyHttpEncoder extends MessageToMessageEncoder { +public class SpdyHttpEncoder extends MessageToMessageEncoder { private final int spdyVersion; private volatile int currentStreamId; @@ -141,7 +141,7 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder { } @Override - public Object encode(ChannelHandlerContext ctx, Object msg) throws Exception { + public Object encode(ChannelHandlerContext ctx, HttpObject msg) throws Exception { List out = new ArrayList(); if (msg instanceof HttpRequest) { @@ -288,7 +288,7 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder { } @Override - protected void freeOutboundMessage(Object msg) throws Exception { + protected void freeOutboundMessage(HttpObject msg) throws Exception { if (msg instanceof HttpContent) { // Will be freed later as the content of them is just reused here return; diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyOrHttpChooser.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyOrHttpChooser.java index fdc036ce7e..0abf766cde 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyOrHttpChooser.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyOrHttpChooser.java @@ -16,8 +16,8 @@ package io.netty.handler.codec.spdy; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundMessageHandler; @@ -35,7 +35,7 @@ import javax.net.ssl.SSLEngine; * much about the low-level details. * */ -public abstract class SpdyOrHttpChooser extends ChannelHandlerAdapter implements ChannelInboundByteHandler { +public abstract class SpdyOrHttpChooser extends ChannelDuplexHandler implements ChannelInboundByteHandler { public enum SelectedProtocol { SpdyVersion2, diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java index 35463f809e..797aa6202c 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java @@ -17,9 +17,9 @@ package io.netty.handler.codec.spdy; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelOutboundMessageHandler; @@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; * Manages streams within a SPDY session. */ public class SpdySessionHandler - extends ChannelHandlerAdapter + extends ChannelDuplexHandler implements ChannelInboundMessageHandler, ChannelOutboundMessageHandler { private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException(); diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToByteCodec.java b/codec/src/main/java/io/netty/handler/codec/ByteToByteCodec.java index f2ea493add..faaa2f3f64 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToByteCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToByteCodec.java @@ -16,7 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelOutboundByteHandler; @@ -53,7 +53,7 @@ import io.netty.channel.ChannelPromise; * */ public abstract class ByteToByteCodec - extends ChannelHandlerAdapter + extends ChannelDuplexHandler implements ChannelInboundByteHandler, ChannelOutboundByteHandler { private final ByteToByteEncoder encoder = new ByteToByteEncoder() { diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java index cdb64487c3..0408e89112 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java @@ -47,8 +47,7 @@ import io.netty.channel.PartialFlushException; public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapter { @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - ByteBuf in = ctx.outboundByteBuffer(); + protected void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception { ByteBuf out = ctx.nextOutboundByteBuffer(); boolean encoded = false; diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java index 76379b4159..6a92b94bbd 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java @@ -17,14 +17,14 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; -import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelPromise; -public abstract class ByteToMessageCodec extends ChannelHandlerAdapter +public abstract class ByteToMessageCodec extends ChannelDuplexHandler implements ChannelInboundByteHandler, ChannelOutboundMessageHandler { private final Class[] encodableMessageTypes; diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java index d82b25edb2..1ddeb0e232 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java @@ -16,7 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.MessageBuf; -import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundMessageHandler; @@ -49,7 +49,7 @@ import io.netty.channel.ChannelPromise; * */ public abstract class MessageToMessageCodec - extends ChannelHandlerAdapter + extends ChannelDuplexHandler implements ChannelInboundMessageHandler, ChannelOutboundMessageHandler { diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index 30cb8269d5..729c8953ab 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -67,6 +67,7 @@ public abstract class MessageToMessageDecoder public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { MessageBuf in = ctx.inboundMessageBuffer(); + MessageBuf out = ctx.nextInboundMessageBuffer(); boolean notify = false; for (;;) { try { @@ -75,7 +76,7 @@ public abstract class MessageToMessageDecoder break; } if (!isDecodable(msg)) { - ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); + out.add(msg); notify = true; continue; } diff --git a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java index dc3256efa3..ddcbe0971b 100644 --- a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java +++ b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java @@ -15,9 +15,9 @@ */ package io.netty.handler.logging; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.logging.InternalLogLevel; @@ -32,7 +32,7 @@ import java.net.SocketAddress; * @apiviz.landmark */ @Sharable -public class LoggingHandler extends ChannelHandlerAdapter { +public class LoggingHandler extends ChannelDuplexHandler { private static final LogLevel DEFAULT_LEVEL = LogLevel.DEBUG; diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 07e8ec2686..6eac119320 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -18,10 +18,10 @@ package io.netty.handler.ssl; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFlushPromiseNotifier; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelOutboundByteHandler; @@ -142,7 +142,7 @@ import java.util.regex.Pattern; * @apiviz.uses io.netty.handler.ssl.SslBufferPool */ public class SslHandler - extends ChannelHandlerAdapter + extends ChannelDuplexHandler implements ChannelInboundByteHandler, ChannelOutboundByteHandler { private static final InternalLogger logger = diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index c51ff20e6a..2f19449fbd 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -18,11 +18,11 @@ package io.netty.handler.stream; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelPipeline; @@ -67,7 +67,7 @@ import java.util.concurrent.atomic.AtomicInteger; * @apiviz.has io.netty.handler.stream.ChunkedInput oneway - - reads from */ public class ChunkedWriteHandler - extends ChannelHandlerAdapter implements ChannelOutboundMessageHandler { + extends ChannelDuplexHandler implements ChannelOutboundMessageHandler { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChunkedWriteHandler.class); diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 067ad5a042..5a3ecb5d54 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -17,9 +17,9 @@ package io.netty.handler.timeout; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOperationHandler; @@ -75,7 +75,7 @@ import java.util.concurrent.TimeUnit; * } * * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}. - * public class MyHandler extends {@link ChannelHandlerAdapter} { + * public class MyHandler extends {@link ChannelDuplexHandler} { * {@code @Override} * public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} { * if (evt instanceof {@link IdleState}} { diff --git a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java index 99ecd9131d..52aea5e69d 100644 --- a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java @@ -17,7 +17,7 @@ package io.netty.handler.timeout; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelStateHandlerAdapter; @@ -41,7 +41,7 @@ import java.util.concurrent.TimeUnit; * } * * // Handler should handle the {@link ReadTimeoutException}. - * public class MyHandler extends {@link ChannelHandlerAdapter} { + * public class MyHandler extends {@link ChannelDuplexHandler} { * {@code @Override} * public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause) * throws {@link Exception} { diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index 6ef6b3e75b..12d3fac0d2 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -17,9 +17,9 @@ package io.netty.handler.timeout; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOperationHandlerAdapter; @@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit; * } * * // Handler should handle the {@link WriteTimeoutException}. - * public class MyHandler extends {@link ChannelHandlerAdapter} { + * public class MyHandler extends {@link ChannelDuplexHandler} { * {@code @Override} * public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause) * throws {@link Exception} { diff --git a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java index 5d9054787b..d8de674e43 100644 --- a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java @@ -16,7 +16,7 @@ package io.netty.handler.traffic; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelOutboundByteHandler; @@ -43,7 +43,7 @@ import java.util.concurrent.TimeUnit; * or start the monitoring, to change the checkInterval directly, or to have access to its values. * */ -public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapter +public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler implements ChannelInboundByteHandler, ChannelOutboundByteHandler { /** diff --git a/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpOutboundByteStreamHandler.java b/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpOutboundByteStreamHandler.java index 1510025c16..f4ff1cfc79 100644 --- a/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpOutboundByteStreamHandler.java +++ b/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpOutboundByteStreamHandler.java @@ -43,9 +43,7 @@ public class SctpOutboundByteStreamHandler extends ChannelOutboundByteHandlerAda } @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - ByteBuf in = ctx.outboundByteBuffer(); - + protected void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception { try { MessageBuf out = ctx.nextOutboundMessageBuffer(); ByteBuf payload = Unpooled.buffer(in.readableBytes()); diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index bfdfc46f8e..3915c94852 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -15,6 +15,7 @@ */ package io.netty.channel; +import io.netty.buffer.BufType; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.MessageBuf; @@ -822,7 +823,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private int outboundBufSize() { final int bufSize; final ChannelHandlerContext ctx = directOutboundContext(); - if (ctx.hasOutboundByteBuffer()) { + if (metadata().bufferType() == BufType.BYTE) { bufSize = ctx.outboundByteBuffer().readableBytes(); } else { bufSize = ctx.outboundMessageBuffer().size(); @@ -869,7 +870,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha ChannelHandlerContext ctx = directOutboundContext(); Throwable cause = null; try { - if (ctx.hasOutboundByteBuffer()) { + if (metadata().bufferType() == BufType.BYTE) { ByteBuf out = ctx.outboundByteBuffer(); int oldSize = out.readableBytes(); try { @@ -877,7 +878,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } catch (Throwable t) { cause = t; } finally { - flushFutureNotifier.increaseWriteCounter(oldSize - out.readableBytes()); + int delta = oldSize - out.readableBytes(); + out.discardSomeReadBytes(); + flushFutureNotifier.increaseWriteCounter(delta); } } else { MessageBuf out = ctx.outboundMessageBuffer(); diff --git a/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java b/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java new file mode 100644 index 0000000000..473ad5cde7 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java @@ -0,0 +1,124 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import java.net.SocketAddress; + +/** + * {@link ChannelHandler} implementation which represents a combination out of a {@link ChannelStateHandler} and + * the {@link ChannelOperationHandler}. + * + * It is a good starting point if your {@link ChannelHandler} implementation needs to intercept operations and also + * state updates. + */ +public abstract class ChannelDuplexHandler extends ChannelStateHandlerAdapter implements ChannelOperationHandler { + + /** + * Calls {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)} to forward + * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. + * + * Sub-classes may override this method to change behavior. + */ + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, + ChannelPromise future) throws Exception { + ctx.bind(localAddress, future); + } + + /** + * Calls {@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)} to forward + * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. + * + * Sub-classes may override this method to change behavior. + */ + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, + SocketAddress localAddress, ChannelPromise future) throws Exception { + ctx.connect(remoteAddress, localAddress, future); + } + + /** + * Calls {@link ChannelHandlerContext#disconnect(ChannelPromise)} to forward + * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. + * + * Sub-classes may override this method to change behavior. + */ + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise future) + throws Exception { + ctx.disconnect(future); + } + + /** + * Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward + * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. + * + * Sub-classes may override this method to change behavior. + */ + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise future) + throws Exception { + ctx.close(future); + } + + /** + * Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward + * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. + * + * Sub-classes may override this method to change behavior. + */ + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise future) + throws Exception { + ctx.deregister(future); + } + + @Override + public void read(ChannelHandlerContext ctx) { + ctx.read(); + } + + /** + * Calls {@link ChannelHandlerContext#flush(ChannelPromise)} to forward + * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. + * + * Sub-classes may override this method to change behavior. + * + * Be aware that if your class also implement {@link ChannelOutboundHandler} it need to {@code @Override} this + * method and provide some proper implementation. Fail to do so, will result in an {@link IllegalStateException}! + */ + @Override + public void flush(ChannelHandlerContext ctx, ChannelPromise future) + throws Exception { + if (this instanceof ChannelOutboundHandler) { + throw new IllegalStateException( + "flush(...) must be overridden by " + getClass().getName() + + ", which implements " + ChannelOutboundHandler.class.getSimpleName()); + } + ctx.flush(future); + } + + /** + * Calls {@link ChannelHandlerContext#sendFile(FileRegion, ChannelPromise)} to forward + * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. + * + * Sub-classes may override this method to change behavior. + */ + @Override + public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise future) throws Exception { + ctx.sendFile(region, future); + } +} diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index d515db640f..baede4d778 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2012 The Netty Project + * Copyright 2013 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -13,112 +13,75 @@ * License for the specific language governing permissions and limitations * under the License. */ + package io.netty.channel; -import java.net.SocketAddress; +public abstract class ChannelHandlerAdapter implements ChannelHandler { -/** - * {@link ChannelHandler} implementation which represents a combination out of a {@link ChannelStateHandler} and - * the {@link ChannelOperationHandler}. - * - * It is a good starting point if your {@link ChannelHandler} implementation needs to intercept operations and also - * state updates. - */ -public abstract class ChannelHandlerAdapter extends ChannelStateHandlerAdapter implements ChannelOperationHandler { + // Not using volatile because it's used only for a sanity check. + boolean added; /** - * Calls {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)} to forward - * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. - * - * Sub-classes may override this method to change behavior. + * Return {@code true} if the implementation is {@link Sharable} and so can be added + * to different {@link ChannelPipeline}s. */ - @Override - public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, - ChannelPromise future) throws Exception { - ctx.bind(localAddress, future); + final boolean isSharable() { + return getClass().isAnnotationPresent(Sharable.class); } /** - * Calls {@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)} to forward - * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. - * - * Sub-classes may override this method to change behavior. + * Do nothing by default, sub-classes may override this method. */ @Override - public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, - SocketAddress localAddress, ChannelPromise future) throws Exception { - ctx.connect(remoteAddress, localAddress, future); + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + // NOOP } /** - * Calls {@link ChannelHandlerContext#disconnect(ChannelPromise)} to forward - * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. + * Do nothing by default, sub-classes may override this method. + */ + @Override + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + /** + * Do nothing by default, sub-classes may override this method. + */ + @Override + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + /** + * Do nothing by default, sub-classes may override this method. + */ + @Override + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + /** + * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward + * to the next {@link ChannelHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override - public void disconnect(ChannelHandlerContext ctx, ChannelPromise future) + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ctx.disconnect(future); + ctx.fireExceptionCaught(cause); } /** - * Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward - * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. + * Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward + * to the next {@link ChannelHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override - public void close(ChannelHandlerContext ctx, ChannelPromise future) + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - ctx.close(future); - } - - /** - * Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward - * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. - * - * Sub-classes may override this method to change behavior. - */ - @Override - public void deregister(ChannelHandlerContext ctx, ChannelPromise future) - throws Exception { - ctx.deregister(future); - } - - @Override - public void read(ChannelHandlerContext ctx) { - ctx.read(); - } - - /** - * Calls {@link ChannelHandlerContext#flush(ChannelPromise)} to forward - * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. - * - * Sub-classes may override this method to change behavior. - * - * Be aware that if your class also implement {@link ChannelOutboundHandler} it need to {@code @Override} this - * method and provide some proper implementation. Fail to do so, will result in an {@link IllegalStateException}! - */ - @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise future) - throws Exception { - if (this instanceof ChannelOutboundHandler) { - throw new IllegalStateException( - "flush(...) must be overridden by " + getClass().getName() + - ", which implements " + ChannelOutboundHandler.class.getSimpleName()); - } - ctx.flush(future); - } - - /** - * Calls {@link ChannelHandlerContext#sendFile(FileRegion, ChannelPromise)} to forward - * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. - * - * Sub-classes may override this method to change behavior. - */ - @Override - public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise future) throws Exception { - ctx.sendFile(region, future); + ctx.fireUserEventTriggered(evt); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index 121a1d96c2..cea98b0f46 100755 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -47,7 +47,7 @@ import java.util.Set; * You can keep the {@link ChannelHandlerContext} for later use, such as * triggering an event outside the handler methods, even from a different thread. *
- * public class MyHandler extends {@link ChannelHandlerAdapter} {
+ * public class MyHandler extends {@link ChannelDuplexHandler} {
  *
  *     private {@link ChannelHandlerContext} ctx;
  *
@@ -293,52 +293,22 @@ public interface ChannelHandlerContext
      MessageBuf replaceOutboundMessageBuffer(MessageBuf newOutboundMsgBuf);
 
     /**
-     * Return {@code true} if the next {@link ChannelHandlerContext} has a {@link ByteBuf} for handling
-     * inbound data.
-     */
-    boolean hasNextInboundByteBuffer();
-
-    /**
-     * Return {@code true} if the next {@link ChannelHandlerContext} has a {@link MessageBuf} for handling
-     * inbound data.
-     */
-    boolean hasNextInboundMessageBuffer();
-
-    /**
-     * Return the {@link ByteBuf} of the next {@link ChannelHandlerContext} if {@link #hasNextInboundByteBuffer()}
-     * returned {@code true}, otherwise a {@link UnsupportedOperationException} is thrown.
+     * Return the {@link ByteBuf} of the next {@link ChannelHandlerContext}.
      */
     ByteBuf nextInboundByteBuffer();
 
     /**
-     * Return the {@link MessageBuf} of the next {@link ChannelHandlerContext} if
-     * {@link #hasNextInboundMessageBuffer()} returned {@code true}, otherwise a
-     * {@link UnsupportedOperationException} is thrown.
+     * Return the {@link MessageBuf} of the next {@link ChannelHandlerContext}.
      */
     MessageBuf nextInboundMessageBuffer();
 
     /**
-     * Return {@code true} if the next {@link ChannelHandlerContext} has a {@link ByteBuf} for handling outbound
-     * data.
-     */
-    boolean hasNextOutboundByteBuffer();
-
-    /**
-     * Return {@code true} if the next {@link ChannelHandlerContext} has a {@link MessageBuf} for handling
-     * outbound data.
-     */
-    boolean hasNextOutboundMessageBuffer();
-
-    /**
-     * Return the {@link ByteBuf} of the next {@link ChannelHandlerContext} if {@link #hasNextOutboundByteBuffer()}
-     * returned {@code true}, otherwise a {@link UnsupportedOperationException} is thrown.
+     * Return the {@link ByteBuf} of the next {@link ChannelHandlerContext}.
      */
     ByteBuf nextOutboundByteBuffer();
 
     /**
-     * Return the {@link MessageBuf} of the next {@link ChannelHandlerContext} if
-     * {@link #hasNextOutboundMessageBuffer()} returned {@code true}, otherwise a
-     * {@link UnsupportedOperationException} is thrown.
+     * Return the {@link MessageBuf} of the next {@link ChannelHandlerContext}.
      */
     MessageBuf nextOutboundMessageBuffer();
 }
diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java
index e63ebba84a..1c431b9e4b 100644
--- a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java
+++ b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java
@@ -64,37 +64,12 @@ public final class ChannelHandlerUtil {
         }
 
         if (inbound) {
-            if (ctx.hasNextInboundMessageBuffer()) {
-                ctx.nextInboundMessageBuffer().add(msg);
-                return true;
-            }
-
-            if (msg instanceof ByteBuf && ctx.hasNextInboundByteBuffer()) {
-                ByteBuf altDst = ctx.nextInboundByteBuffer();
-                ByteBuf src = (ByteBuf) msg;
-                altDst.writeBytes(src, src.readerIndex(), src.readableBytes());
-                return true;
-            }
-        } else {
-            if (ctx.hasNextOutboundMessageBuffer()) {
-                ctx.nextOutboundMessageBuffer().add(msg);
-                return true;
-            }
-
-            if (msg instanceof ByteBuf && ctx.hasNextOutboundByteBuffer()) {
-                ByteBuf altDst = ctx.nextOutboundByteBuffer();
-                ByteBuf src = (ByteBuf) msg;
-                altDst.writeBytes(src, src.readerIndex(), src.readableBytes());
-                return true;
-            }
+            ctx.nextInboundMessageBuffer().add(msg);
+            return true;
         }
 
-        throw new NoSuchBufferException(String.format(
-                "the handler '%s' could not find a %s which accepts a %s.",
-                ctx.name(),
-                inbound? ChannelInboundHandler.class.getSimpleName()
-                       : ChannelOutboundHandler.class.getSimpleName(),
-                msg.getClass().getSimpleName()));
+        ctx.nextOutboundMessageBuffer().add(msg);
+        return true;
     }
 
     private static final Class[] EMPTY_TYPES = new Class[0];
diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundByteHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundByteHandlerAdapter.java
index ede1eca472..84592e8e03 100644
--- a/transport/src/main/java/io/netty/channel/ChannelInboundByteHandlerAdapter.java
+++ b/transport/src/main/java/io/netty/channel/ChannelInboundByteHandlerAdapter.java
@@ -19,13 +19,13 @@ import io.netty.buffer.ByteBuf;
 
 
 /**
- * Abstract base class for {@link ChannelInboundHandlerAdapter} which should be extended by the user to
+ * Abstract base class for {@link ChannelInboundByteHandler} which should be extended by the user to
  * get notified once more data is ready to get consumed from the inbound {@link ByteBuf}.
  *
  * This implementation is a good starting point for most users.
  */
 public abstract class ChannelInboundByteHandlerAdapter
-        extends ChannelInboundHandlerAdapter implements ChannelInboundByteHandler {
+        extends ChannelStateHandlerAdapter implements ChannelInboundByteHandler {
 
     /**
      * Create a new unpooled {@link ByteBuf} by default. Sub-classes may override this to offer a more
diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java
deleted file mode 100644
index 5af0aa0bbd..0000000000
--- a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2012 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package io.netty.channel;
-
-
-import io.netty.buffer.Buf;
-
-/**
- * Abstract base class for a {@link ChannelHandler} that handles inbound data.
- *
- * Please either extend {@link ChannelInboundByteHandlerAdapter} or
- * {@link ChannelInboundMessageHandlerAdapter}.
- */
-abstract class ChannelInboundHandlerAdapter
-        extends ChannelStateHandlerAdapter implements ChannelInboundHandler {
-
-    /**
-     * Calls {@link Buf#free()} to free the buffer, sub-classes may override this.
-     *
-     * When doing so be aware that you will need to handle all the resource management by your own.
-     */
-    @Override
-    public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
-        if (ctx.hasInboundByteBuffer()) {
-            ctx.inboundByteBuffer().free();
-        } else {
-            ctx.inboundMessageBuffer().free();
-        }
-    }
-}
diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java
index c1a4b099af..d44e1c9c06 100644
--- a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java
+++ b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java
@@ -85,8 +85,8 @@ public interface ChannelInboundInvoker {
     void fireInboundBufferUpdated();
 
     /**
-     * Triggers an {@link ChannelStateHandler#channelReadSuspended(ChannelHandlerContext) inboundBufferSuspended}
+     * Triggers an {@link ChannelStateHandler#channelReadSuspended(ChannelHandlerContext) channelReadSuspended}
      * event to the next {@link ChannelStateHandler} in the {@link ChannelPipeline}.
      */
-    void fireInboundBufferSuspended();
+    void fireChannelReadSuspended();
 }
diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java
index b354dbdd40..a5d3ab02dd 100644
--- a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java
+++ b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java
@@ -41,7 +41,7 @@ import io.netty.buffer.Unpooled;
  * @param    The type of the messages to handle
  */
 public abstract class ChannelInboundMessageHandlerAdapter
-        extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler {
+        extends ChannelStateHandlerAdapter implements ChannelInboundMessageHandler {
 
     private final Class[] acceptedMsgTypes;
 
@@ -73,6 +73,7 @@ public abstract class ChannelInboundMessageHandlerAdapter
 
         try {
             MessageBuf in = ctx.inboundMessageBuffer();
+            MessageBuf out = null;
             for (;;) {
                 Object msg = in.poll();
                 if (msg == null) {
@@ -80,7 +81,10 @@ public abstract class ChannelInboundMessageHandlerAdapter
                 }
                 try {
                     if (!isSupported(msg)) {
-                        ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
+                        if (out == null) {
+                            out = ctx.nextOutboundMessageBuffer();
+                        }
+                        out.add(msg);
                         unsupportedFound = true;
                         continue;
                     }
diff --git a/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java
index 0093925f79..a009f2123d 100644
--- a/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java
+++ b/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java
@@ -17,63 +17,7 @@ package io.netty.channel;
 
 import java.net.SocketAddress;
 
-public abstract class ChannelOperationHandlerAdapter implements ChannelOperationHandler {
-
-    /**
-     * Do nothing by default, sub-classes may override this method.
-     */
-    @Override
-    public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
-        // NOOP
-    }
-
-    /**
-     * Do nothing by default, sub-classes may override this method.
-     */
-    @Override
-    public void afterAdd(ChannelHandlerContext ctx) throws Exception {
-        // NOOP
-    }
-
-    /**
-     * Do nothing by default, sub-classes may override this method.
-     */
-    @Override
-    public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
-        // NOOP
-    }
-
-    /**
-     * Do nothing by default, sub-classes may override this method.
-     */
-    @Override
-    public void afterRemove(ChannelHandlerContext ctx) throws Exception {
-        // NOOP
-    }
-
-    /**
-     * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward
-     * to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
-     *
-     * Sub-classes may override this method to change behavior.
-     */
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-            throws Exception {
-        ctx.fireExceptionCaught(cause);
-    }
-
-    /**
-     * Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward
-     * to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
-     *
-     * Sub-classes may override this method to change behavior.
-     */
-    @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
-            throws Exception {
-        ctx.fireUserEventTriggered(evt);
-    }
+public abstract class ChannelOperationHandlerAdapter extends ChannelHandlerAdapter implements ChannelOperationHandler {
 
     /**
      * Calls {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)} to forward
diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundByteHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundByteHandlerAdapter.java
index 3a9ba82ede..ad3573e8e5 100644
--- a/transport/src/main/java/io/netty/channel/ChannelOutboundByteHandlerAdapter.java
+++ b/transport/src/main/java/io/netty/channel/ChannelOutboundByteHandlerAdapter.java
@@ -21,7 +21,7 @@ import io.netty.buffer.ByteBuf;
  * Abstract base class which handles outgoing bytes.
  */
 public abstract class ChannelOutboundByteHandlerAdapter
-        extends ChannelOutboundHandlerAdapter implements ChannelOutboundByteHandler {
+        extends ChannelOperationHandlerAdapter implements ChannelOutboundByteHandler {
     @Override
     public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
         return ctx.alloc().buffer();
@@ -36,4 +36,21 @@ public abstract class ChannelOutboundByteHandlerAdapter
     public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
         ctx.outboundByteBuffer().free();
     }
+
+    /**
+     * This method merely delegates the flush request to {@link #flush(ChannelHandlerContext, ByteBuf, ChannelPromise)}.
+     */
+    @Override
+    public final void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+        flush(ctx, ctx.outboundByteBuffer(), promise);
+    }
+
+    /**
+     * Invoked when a flush request has been issued.
+     *
+     * @param ctx the current context
+     * @param in this handler's outbound buffer
+     * @param promise the promise associate with the current flush request
+     */
+    protected abstract void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception;
 }
diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java
deleted file mode 100644
index fce8bbc13b..0000000000
--- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2012 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package io.netty.channel;
-
-import io.netty.buffer.Buf;
-
-
-/**
- * Abstract base class for a {@link ChannelHandler} that handles outbound data.
- *
- * Please extend {@link ChannelOutboundByteHandlerAdapter} or
- * {@link ChannelOutboundMessageHandlerAdapter}.
- */
-abstract class ChannelOutboundHandlerAdapter
-        extends ChannelOperationHandlerAdapter implements ChannelOutboundHandler {
-
-    /**
-     * Calls {@link Buf#free()} to free the buffer, sub-classes may override this.
-     *
-     * When doing so be aware that you will need to handle all the resource management by your own.
-     */
-    @Override
-    public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
-        if (ctx.hasOutboundByteBuffer()) {
-            ctx.outboundByteBuffer().free();
-        } else {
-            ctx.outboundMessageBuffer().free();
-        }
-    }
-}
diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java
index 7861004c7c..0ba1a528eb 100644
--- a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java
+++ b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java
@@ -157,7 +157,7 @@ public interface ChannelOutboundInvoker {
      * Reads data from the {@link Channel} into the first inbound buffer, triggers an
      * {@link ChannelStateHandler#inboundBufferUpdated(ChannelHandlerContext) inboundBufferUpdated} event if data was
      * read, and triggers an
-     * {@link ChannelStateHandler#channelReadSuspended(ChannelHandlerContext) inboundBufferSuspended} event so the
+     * {@link ChannelStateHandler#channelReadSuspended(ChannelHandlerContext) channelReadSuspended} event so the
      * handler can decide to continue reading.  If there's a pending read operation already, this method does nothing.
      */
     void read();
diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java
index 53b8eb9c41..323ac74bc9 100644
--- a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java
+++ b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java
@@ -24,7 +24,7 @@ import io.netty.buffer.Unpooled;
  * @param    The type of the messages to handle
  */
 public abstract class ChannelOutboundMessageHandlerAdapter
-        extends ChannelOutboundHandlerAdapter implements ChannelOutboundMessageHandler {
+        extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler {
 
     @Override
     public MessageBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
diff --git a/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java
index aae10891f2..32286460e6 100644
--- a/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java
+++ b/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java
@@ -23,73 +23,7 @@ package io.netty.channel;
  * This implementation just forward the operation to the next {@link ChannelHandler} in the
  * {@link ChannelPipeline}. Sub-classes may override a method implementation to change this.
  */
-public class ChannelStateHandlerAdapter implements ChannelStateHandler {
-
-    // Not using volatile because it's used only for a sanity check.
-    boolean added;
-
-    /**
-     * Return {@code true} if the implementation is {@link Sharable} and so can be added
-     * to different {@link ChannelPipeline}s.
-     */
-    final boolean isSharable() {
-        return getClass().isAnnotationPresent(Sharable.class);
-    }
-    /**
-     * Do nothing by default, sub-classes may override this method.
-     */
-    @Override
-    public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
-        // NOOP
-    }
-
-    /**
-     * Do nothing by default, sub-classes may override this method.
-     */
-    @Override
-    public void afterAdd(ChannelHandlerContext ctx) throws Exception {
-        // NOOP
-    }
-
-    /**
-     * Do nothing by default, sub-classes may override this method.
-     */
-    @Override
-    public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
-        // NOOP
-    }
-
-    /**
-     * Do nothing by default, sub-classes may override this method.
-     */
-    @Override
-    public void afterRemove(ChannelHandlerContext ctx) throws Exception {
-        // NOOP
-    }
-
-    /**
-     * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward
-     * to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
-     *
-     * Sub-classes may override this method to change behavior.
-     */
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-            throws Exception {
-        ctx.fireExceptionCaught(cause);
-    }
-
-    /**
-     * Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward
-     * to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
-     *
-     * Sub-classes may override this method to change behavior.
-     */
-    @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
-            throws Exception {
-        ctx.fireUserEventTriggered(evt);
-    }
+public abstract class ChannelStateHandlerAdapter extends ChannelHandlerAdapter implements ChannelStateHandler {
 
     /**
      * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
@@ -156,6 +90,6 @@ public class ChannelStateHandlerAdapter implements ChannelStateHandler {
 
     @Override
     public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception {
-        ctx.fireInboundBufferSuspended();
+        ctx.fireChannelReadSuspended();
     }
 }
diff --git a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java
new file mode 100644
index 0000000000..3329197937
--- /dev/null
+++ b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.channel;
+
+import java.net.SocketAddress;
+
+/**
+ *  Combines a {@link ChannelStateHandler} and a {@link ChannelOperationHandler} into one {@link ChannelHandler}.
+ *
+ */
+public class CombinedChannelDuplexHandler extends ChannelDuplexHandler {
+
+    private ChannelStateHandler stateHandler;
+    private ChannelOperationHandler operationHandler;
+
+    /**
+     * Creates a new uninitialized instance. A class that extends this handler must invoke
+     * {@link #init(ChannelStateHandler, ChannelOperationHandler)} before adding this handler into a
+     * {@link ChannelPipeline}.
+     */
+    protected CombinedChannelDuplexHandler() { }
+
+    /**
+     * Creates a new instance that combines the specified two handlers into one.
+     */
+    public CombinedChannelDuplexHandler(ChannelStateHandler stateHandler, ChannelOperationHandler operationHandler) {
+        init(stateHandler, operationHandler);
+    }
+
+    /**
+     * Initialized this handler with the specified handlers.
+     *
+     * @throws IllegalStateException if this handler was not constructed via the default constructor or
+     *                               if this handler does not implement all required handler interfaces
+     * @throws IllegalArgumentException if the specified handlers cannot be combined into one due to a conflict
+     *                                  in the type hierarchy
+     */
+    protected final void init(ChannelStateHandler stateHandler, ChannelOperationHandler operationHandler) {
+        validate(stateHandler, operationHandler);
+        this.stateHandler = stateHandler;
+        this.operationHandler = operationHandler;
+    }
+
+    @SuppressWarnings("InstanceofIncompatibleInterface")
+    private void validate(ChannelStateHandler stateHandler, ChannelOperationHandler operationHandler) {
+        if (this.stateHandler != null) {
+            throw new IllegalStateException(
+                    "init() can not be invoked if " + CombinedChannelDuplexHandler.class.getSimpleName() +
+                            " was constructed with non-default constructor.");
+        }
+
+        if (stateHandler == null) {
+            throw new NullPointerException("stateHandler");
+        }
+        if (operationHandler == null) {
+            throw new NullPointerException("operationHandler");
+        }
+        if (stateHandler instanceof ChannelOperationHandler) {
+            throw new IllegalArgumentException(
+                    "stateHandler must not implement " +
+                    ChannelOperationHandler.class.getSimpleName() + " to get combined.");
+        }
+        if (operationHandler instanceof ChannelStateHandler) {
+            throw new IllegalArgumentException(
+                    "operationHandler must not implement " +
+                    ChannelStateHandler.class.getSimpleName() + " to get combined.");
+        }
+
+        if (stateHandler instanceof ChannelInboundByteHandler && !(this instanceof ChannelInboundByteHandler)) {
+            throw new IllegalStateException(
+                    getClass().getSimpleName() + " must implement " + ChannelInboundByteHandler.class.getSimpleName() +
+                    " if stateHandler implements " + ChannelInboundByteHandler.class.getSimpleName());
+        }
+
+        if (stateHandler instanceof ChannelInboundMessageHandler && !(this instanceof ChannelInboundMessageHandler)) {
+            throw new IllegalStateException(
+                    getClass().getSimpleName() + " must implement " +
+                    ChannelInboundMessageHandler.class.getSimpleName() + " if stateHandler implements " +
+                    ChannelInboundMessageHandler.class.getSimpleName());
+        }
+
+        if (operationHandler instanceof ChannelOutboundByteHandler && !(this instanceof ChannelOutboundByteHandler)) {
+            throw new IllegalStateException(
+                    getClass().getSimpleName() + " must implement " +
+                    ChannelOutboundByteHandler.class.getSimpleName() + " if operationHandler implements " +
+                    ChannelOutboundByteHandler.class.getSimpleName());
+        }
+
+        if (operationHandler instanceof ChannelOutboundMessageHandler &&
+            !(this instanceof ChannelOutboundMessageHandler)) {
+            throw new IllegalStateException(
+                    getClass().getSimpleName() + " must implement " +
+                    ChannelOutboundMessageHandler.class.getSimpleName() + " if operationHandler implements " +
+                    ChannelOutboundMessageHandler.class.getSimpleName());
+        }
+    }
+
+    protected final ChannelStateHandler stateHandler() {
+        return stateHandler;
+    }
+
+    protected final ChannelOperationHandler operationHandler() {
+        return operationHandler;
+    }
+
+    @Override
+    public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
+        if (stateHandler == null) {
+            throw new IllegalStateException(
+                    "init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() +
+                    " if " +  CombinedChannelDuplexHandler.class.getSimpleName() +
+                    " was constructed with the default constructor.");
+        }
+
+        try {
+            stateHandler.beforeAdd(ctx);
+        } finally {
+            operationHandler.beforeAdd(ctx);
+        }
+    }
+
+    @Override
+    public void afterAdd(ChannelHandlerContext ctx) throws Exception {
+        try {
+            stateHandler.afterAdd(ctx);
+        } finally {
+            operationHandler.afterAdd(ctx);
+        }
+    }
+
+    @Override
+    public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
+        try {
+            stateHandler.beforeRemove(ctx);
+        } finally {
+            operationHandler.beforeRemove(ctx);
+        }
+    }
+
+    @Override
+    public void afterRemove(ChannelHandlerContext ctx) throws Exception {
+        try {
+            stateHandler.afterRemove(ctx);
+        } finally {
+            operationHandler.afterRemove(ctx);
+        }
+    }
+
+    @Override
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+        stateHandler.channelRegistered(ctx);
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+        stateHandler.channelUnregistered(ctx);
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        stateHandler.channelActive(ctx);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        stateHandler.channelInactive(ctx);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        stateHandler.exceptionCaught(ctx, cause);
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        stateHandler.userEventTriggered(ctx, evt);
+    }
+
+    @Override
+    public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
+        stateHandler.inboundBufferUpdated(ctx);
+        if (stateHandler instanceof ChannelInboundByteHandler) {
+            ((ChannelInboundByteHandler) stateHandler).discardInboundReadBytes(ctx);
+        }
+    }
+
+    @Override
+    public void bind(
+            ChannelHandlerContext ctx,
+            SocketAddress localAddress, ChannelPromise promise) throws Exception {
+        operationHandler.bind(ctx, localAddress, promise);
+    }
+
+    @Override
+    public void connect(
+            ChannelHandlerContext ctx,
+            SocketAddress remoteAddress, SocketAddress localAddress,
+            ChannelPromise promise) throws Exception {
+        operationHandler.connect(ctx, remoteAddress, localAddress, promise);
+    }
+
+    @Override
+    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+        operationHandler.disconnect(ctx, promise);
+    }
+
+    @Override
+    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+        operationHandler.close(ctx, promise);
+    }
+
+    @Override
+    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+        operationHandler.deregister(ctx, promise);
+    }
+
+    @Override
+    public void read(ChannelHandlerContext ctx) {
+        operationHandler.read(ctx);
+    }
+
+    @Override
+    public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+        operationHandler.flush(ctx, promise);
+    }
+
+    @Override
+    public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise) throws Exception {
+        operationHandler.sendFile(ctx, region, promise);
+    }
+}
diff --git a/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java b/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java
deleted file mode 100644
index 007a464b2b..0000000000
--- a/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Copyright 2012 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package io.netty.channel;
-
-import io.netty.buffer.Buf;
-
-import java.net.SocketAddress;
-
-/**
- *  Combines a {@link ChannelInboundHandler} and a {@link ChannelOutboundHandler} into one {@link ChannelHandler}.
- *
- */
-public class CombinedChannelHandler extends ChannelStateHandlerAdapter implements ChannelInboundHandler,
-        ChannelOutboundHandler {
-
-    private ChannelOutboundHandler out;
-    private ChannelInboundHandler in;
-
-    protected CombinedChannelHandler() {
-        // User will call init in the subclass constructor.
-    }
-
-    /**
-     * Combine the given {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}.
-     */
-    public CombinedChannelHandler(
-            ChannelInboundHandler inboundHandler, ChannelOutboundHandler outboundHandler) {
-        init(inboundHandler, outboundHandler);
-    }
-
-    /**
-     * Needs to get called before the handler can be added to the {@link ChannelPipeline}.
-     * Otherwise it will trigger a {@link IllegalStateException} later.
-     *
-     */
-    protected void init(ChannelInboundHandler inboundHandler, ChannelOutboundHandler outboundHandler) {
-        if (inboundHandler == null) {
-            throw new NullPointerException("inboundHandler");
-        }
-        if (outboundHandler == null) {
-            throw new NullPointerException("outboundHandler");
-        }
-        if (inboundHandler instanceof ChannelOperationHandler) {
-            throw new IllegalArgumentException(
-                    "inboundHandler must not implement " +
-                    ChannelOperationHandler.class.getSimpleName() + " to get combined.");
-        }
-        if (outboundHandler instanceof ChannelStateHandler) {
-            throw new IllegalArgumentException(
-                    "outboundHandler must not implement " +
-                    ChannelStateHandler.class.getSimpleName() + " to get combined.");
-        }
-
-        if (in != null) {
-            throw new IllegalStateException("init() cannot be called more than once.");
-        }
-
-        in = inboundHandler;
-        out = outboundHandler;
-    }
-
-    @Override
-    public Buf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
-        return in.newInboundBuffer(ctx);
-    }
-
-    @Override
-    public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
-        in.freeInboundBuffer(ctx);
-    }
-
-    @Override
-    public Buf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
-        return out.newOutboundBuffer(ctx);
-    }
-
-    @Override
-    public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
-        out.freeOutboundBuffer(ctx);
-    }
-
-    @Override
-    public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
-        if (in == null) {
-            throw new IllegalStateException(
-                    "not initialized yet - call init() in the constructor of the subclass");
-        }
-
-        try {
-            in.beforeAdd(ctx);
-        } finally {
-            out.beforeAdd(ctx);
-        }
-    }
-
-    @Override
-    public void afterAdd(ChannelHandlerContext ctx) throws Exception {
-        try {
-            in.afterAdd(ctx);
-        } finally {
-            out.afterAdd(ctx);
-        }
-    }
-
-    @Override
-    public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
-        try {
-            in.beforeRemove(ctx);
-        } finally {
-            out.beforeRemove(ctx);
-        }
-    }
-
-    @Override
-    public void afterRemove(ChannelHandlerContext ctx) throws Exception {
-        try {
-            in.afterRemove(ctx);
-        } finally {
-            out.afterRemove(ctx);
-        }
-    }
-
-    @Override
-    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
-        in.channelRegistered(ctx);
-    }
-
-    @Override
-    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-        in.channelUnregistered(ctx);
-    }
-
-    @Override
-    public void channelActive(ChannelHandlerContext ctx) throws Exception {
-        in.channelActive(ctx);
-    }
-
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        in.channelInactive(ctx);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        in.exceptionCaught(ctx, cause);
-    }
-
-    @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-        in.userEventTriggered(ctx, evt);
-    }
-
-    @Override
-    public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
-        in.inboundBufferUpdated(ctx);
-        if (in instanceof ChannelInboundByteHandler) {
-            ((ChannelInboundByteHandler) in).discardInboundReadBytes(ctx);
-        }
-    }
-
-    @Override
-    public void bind(
-            ChannelHandlerContext ctx,
-            SocketAddress localAddress, ChannelPromise promise) throws Exception {
-        out.bind(ctx, localAddress, promise);
-    }
-
-    @Override
-    public void connect(
-            ChannelHandlerContext ctx,
-            SocketAddress remoteAddress, SocketAddress localAddress,
-            ChannelPromise promise) throws Exception {
-        out.connect(ctx, remoteAddress, localAddress, promise);
-    }
-
-    @Override
-    public void disconnect(
-            ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
-        out.disconnect(ctx, promise);
-    }
-
-    @Override
-    public void close(
-            ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
-        out.close(ctx, promise);
-    }
-
-    @Override
-    public void deregister(
-            ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
-        out.deregister(ctx, promise);
-    }
-
-    @Override
-    public void read(ChannelHandlerContext ctx) {
-        out.read(ctx);
-    }
-
-    @Override
-    public void flush(
-            ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
-        out.flush(ctx, promise);
-        if (out instanceof ChannelOutboundByteHandler) {
-            ((ChannelOutboundByteHandler) out).discardOutboundReadBytes(ctx);
-        }
-    }
-
-    @Override
-    public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise) throws Exception {
-        out.sendFile(ctx, region, promise);
-    }
-}
diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java
index e04b65c9f8..ccd17cb030 100755
--- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java
+++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java
@@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.MessageBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.DefaultChannelPipeline.*;
 import io.netty.util.DefaultAttributeMap;
 
 import java.net.SocketAddress;
@@ -67,9 +68,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
     //
     // Note we use an AtomicReferenceFieldUpdater for atomic operations on these to safe memory. This will safe us
     // 64 bytes per Bridge.
+    @SuppressWarnings("UnusedDeclaration")
     private volatile MessageBridge inMsgBridge;
+    @SuppressWarnings("UnusedDeclaration")
     private volatile MessageBridge outMsgBridge;
+    @SuppressWarnings("UnusedDeclaration")
     private volatile ByteBridge inByteBridge;
+    @SuppressWarnings("UnusedDeclaration")
     private volatile ByteBridge outByteBridge;
 
     private static final AtomicReferenceFieldUpdater IN_MSG_BRIDGE_UPDATER
@@ -94,22 +99,15 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
     private Runnable invokeChannelInactiveTask;
     private Runnable invokeInboundBufferUpdatedTask;
     private Runnable fireInboundBufferUpdated0Task;
-    private Runnable invokeInboundBufferSuspendedTask;
+    private Runnable invokeChannelReadSuspendedTask;
     private Runnable invokeFreeInboundBuffer0Task;
     private Runnable invokeFreeOutboundBuffer0Task;
     private Runnable invokeRead0Task;
     volatile boolean removed;
 
-    DefaultChannelHandlerContext(
-            DefaultChannelPipeline pipeline, EventExecutorGroup group,
-            String name, ChannelHandler handler) {
-        this(pipeline, group, name, handler, false);
-    }
-
     @SuppressWarnings("unchecked")
     DefaultChannelHandlerContext(
-            DefaultChannelPipeline pipeline, EventExecutorGroup group,
-            String name, ChannelHandler handler, boolean needsLazyBufInit) {
+            DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) {
 
         if (name == null) {
             throw new NullPointerException("name");
@@ -157,44 +155,63 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
             try {
                 buf = ((ChannelInboundHandler) handler).newInboundBuffer(this);
             } catch (Exception e) {
-                throw new ChannelPipelineException("A user handler failed to create a new inbound buffer.", e);
-            }
-
-            if (buf == null) {
-                throw new ChannelPipelineException("A user handler's newInboundBuffer() returned null");
+                throw new ChannelPipelineException(
+                        handler.getClass().getSimpleName() + ".newInboundBuffer() raised an exception.", e);
             }
 
             if (buf instanceof ByteBuf) {
                 inByteBuf = (ByteBuf) buf;
-                inByteBridge = null;
-                inMsgBuf = null;
-                inMsgBridge = null;
             } else if (buf instanceof MessageBuf) {
-                inByteBuf = null;
-                inByteBridge = null;
                 inMsgBuf = (MessageBuf) buf;
-                inMsgBridge = null;
             } else {
-                throw new Error();
+                throw new ChannelPipelineException(
+                        handler.getClass().getSimpleName() + ".newInboundBuffer() returned neither " +
+                        ByteBuf.class.getSimpleName() + " nor " + MessageBuf.class.getSimpleName() + ": " + buf);
             }
-        } else {
-            inByteBridge = null;
-            inMsgBridge = null;
         }
 
         if (handler instanceof ChannelOutboundHandler) {
-            if (needsLazyBufInit) {
-                // Special case: it means this context is for HeadHandler.
-                // HeadHandler is an outbound handler instantiated by the constructor of DefaultChannelPipeline.
-                // Because Channel is not really fully initialized at this point, we should not call
-                // newOutboundBuffer() yet because it will usually lead to NPE.
-                // To work around this problem, we lazily initialize the outbound buffer for this special case.
+            Buf buf;
+            try {
+                buf = ((ChannelOutboundHandler) handler).newOutboundBuffer(this);
+            } catch (Exception e) {
+                throw new ChannelPipelineException(
+                        handler.getClass().getSimpleName() + ".newOutboundBuffer() raised an exception.", e);
+            }
+
+            if (buf instanceof ByteBuf) {
+                outByteBuf = (ByteBuf) buf;
+            } else if (buf instanceof MessageBuf) {
+                @SuppressWarnings("unchecked")
+                MessageBuf msgBuf = (MessageBuf) buf;
+                outMsgBuf = msgBuf;
             } else {
-                initOutboundBuffer();
+                throw new ChannelPipelineException(
+                        handler.getClass().getSimpleName() + ".newOutboundBuffer() returned neither " +
+                        ByteBuf.class.getSimpleName() + " nor " + MessageBuf.class.getSimpleName() + ": " + buf);
             }
         }
+    }
 
-        this.needsLazyBufInit = needsLazyBufInit;
+    DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, String name, HeadHandler handler) {
+        type = null;
+        channel = pipeline.channel;
+        this.pipeline = pipeline;
+        this.name = name;
+        this.handler = handler;
+        executor = null;
+        needsLazyBufInit = true;
+    }
+
+    DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, String name, TailHandler handler) {
+        type = null;
+        channel = pipeline.channel;
+        this.pipeline = pipeline;
+        this.name = name;
+        this.handler = handler;
+        executor = null;
+        inByteBuf = handler.byteSink;
+        inMsgBuf = handler.msgSink;
     }
 
     void forwardBufferContent() {
@@ -233,58 +250,32 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
         }
     }
 
-    private void lazyInitOutboundBuffer() {
+    private void lazyInitHeadHandler() {
         if (needsLazyBufInit) {
-            if (outByteBuf == null && outMsgBuf == null) {
-                needsLazyBufInit = false;
-                EventExecutor exec = executor();
-                if (exec.inEventLoop()) {
-                    initOutboundBuffer();
-                } else {
-                    try {
-                        getFromFuture(exec.submit(new Runnable() {
-                            @Override
-                            public void run() {
-                                lazyInitOutboundBuffer();
-                            }
-                        }));
-                    } catch (Exception e) {
-                        throw new ChannelPipelineException("failed to initialize an outbound buffer lazily", e);
-                    }
+            EventExecutor exec = executor();
+            if (exec.inEventLoop()) {
+                if (needsLazyBufInit) {
+                    needsLazyBufInit = false;
+                    HeadHandler headHandler = (HeadHandler) handler;
+                    headHandler.init(this);
+                    outByteBuf = headHandler.byteSink;
+                    outMsgBuf = headHandler.msgSink;
+                }
+            } else {
+                try {
+                    getFromFuture(exec.submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            lazyInitHeadHandler();
+                        }
+                    }));
+                } catch (Exception e) {
+                    throw new ChannelPipelineException("failed to initialize an outbound buffer lazily", e);
                 }
             }
         }
     }
 
-    private void initOutboundBuffer() {
-        Buf buf;
-        try {
-            buf = ((ChannelOutboundHandler) handler()).newOutboundBuffer(this);
-        } catch (Exception e) {
-            throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e);
-        }
-
-        if (buf == null) {
-            throw new ChannelPipelineException("A user handler's newOutboundBuffer() returned null");
-        }
-
-        if (buf instanceof ByteBuf) {
-            outByteBuf = (ByteBuf) buf;
-            outByteBridge = null;
-            outMsgBuf = null;
-            outMsgBridge = null;
-        } else if (buf instanceof MessageBuf) {
-            outByteBuf = null;
-            outByteBridge = null;
-            @SuppressWarnings("unchecked")
-            MessageBuf msgBuf = (MessageBuf) buf;
-            outMsgBuf = msgBuf;
-            outMsgBridge = null;
-        } else {
-            throw new Error();
-        }
-    }
-
     private void fillBridge() {
         if (inMsgBridge != null) {
             MessageBridge bridge = inMsgBridge;
@@ -708,88 +699,16 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
         return currentOutboundMsgBuf;
     }
 
-    @Override
-    public boolean hasNextInboundByteBuffer() {
-        DefaultChannelHandlerContext ctx = next;
-        for (;;) {
-            if (ctx == null) {
-                return false;
-            }
-            if (ctx.hasInboundByteBuffer()) {
-                return true;
-            }
-            ctx = ctx.next;
-        }
-    }
-
-    @Override
-    public boolean hasNextInboundMessageBuffer() {
-        DefaultChannelHandlerContext ctx = next;
-        for (;;) {
-            if (ctx == null) {
-                return false;
-            }
-            if (ctx.hasInboundMessageBuffer()) {
-                return true;
-            }
-            ctx = ctx.next;
-        }
-    }
-
-    @Override
-    public boolean hasNextOutboundByteBuffer() {
-        DefaultChannelHandlerContext ctx = prev;
-        for (;;) {
-            if (ctx == null) {
-                return false;
-            }
-
-            if (ctx.hasOutboundByteBuffer()) {
-                return true;
-            }
-
-            ctx = ctx.prev;
-        }
-    }
-
-    @Override
-    public boolean hasNextOutboundMessageBuffer() {
-        DefaultChannelHandlerContext ctx = prev;
-        for (;;) {
-            if (ctx == null) {
-                return false;
-            }
-
-            if (ctx.hasOutboundMessageBuffer()) {
-                return true;
-            }
-
-            ctx = ctx.prev;
-        }
-    }
-
     @Override
     public ByteBuf nextInboundByteBuffer() {
         DefaultChannelHandlerContext ctx = next;
         for (;;) {
-            if (ctx == null) {
-                if (prev != null) {
-                    throw new NoSuchBufferException(String.format(
-                            "the handler '%s' could not find a %s whose inbound buffer is %s.",
-                            name, ChannelInboundHandler.class.getSimpleName(),
-                            ByteBuf.class.getSimpleName()));
-                } else {
-                    throw new NoSuchBufferException(String.format(
-                            "the pipeline does not contain a %s whose inbound buffer is %s.",
-                            ChannelInboundHandler.class.getSimpleName(),
-                            ByteBuf.class.getSimpleName()));
-                }
-            }
             if (ctx.hasInboundByteBuffer()) {
-                if (ctx.executor().inEventLoop()) {
-                    return ctx.inboundByteBuffer();
+                Thread currentThread = Thread.currentThread();
+                if (ctx.executor().inEventLoop(currentThread)) {
+                    return ctx.inByteBuf;
                 }
-                if (executor().inEventLoop()) {
+                if (executor().inEventLoop(currentThread)) {
                     ByteBridge bridge = ctx.inByteBridge;
                     if (bridge == null) {
                         bridge = new ByteBridge(ctx);
@@ -809,25 +728,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
     public MessageBuf nextInboundMessageBuffer() {
         DefaultChannelHandlerContext ctx = next;
         for (;;) {
-            if (ctx == null) {
-                if (prev != null) {
-                    throw new NoSuchBufferException(String.format(
-                            "the handler '%s' could not find a %s whose inbound buffer is %s.",
-                            name, ChannelInboundHandler.class.getSimpleName(),
-                            MessageBuf.class.getSimpleName()));
-                } else {
-                    throw new NoSuchBufferException(String.format(
-                            "the pipeline does not contain a %s whose inbound buffer is %s.",
-                            ChannelInboundHandler.class.getSimpleName(),
-                            MessageBuf.class.getSimpleName()));
-                }
-            }
-
             if (ctx.hasInboundMessageBuffer()) {
-                if (ctx.executor().inEventLoop()) {
-                    return ctx.inboundMessageBuffer();
+                Thread currentThread = Thread.currentThread();
+                if (ctx.executor().inEventLoop(currentThread)) {
+                    return ctx.inMsgBuf;
                 }
-                if (executor().inEventLoop()) {
+                if (executor().inEventLoop(currentThread)) {
                     MessageBridge bridge = ctx.inMsgBridge;
                     if (bridge == null) {
                         bridge = new MessageBridge();
@@ -846,13 +752,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
     @Override
     public ByteBuf nextOutboundByteBuffer() {
         DefaultChannelHandlerContext ctx = prev;
-        final DefaultChannelHandlerContext initialCtx = ctx;
         for (;;) {
             if (ctx.hasOutboundByteBuffer()) {
-                if (ctx.executor().inEventLoop()) {
+                Thread currentThread = Thread.currentThread();
+                if (ctx.executor().inEventLoop(currentThread)) {
                     return ctx.outboundByteBuffer();
                 }
-                if (executor().inEventLoop()) {
+                if (executor().inEventLoop(currentThread)) {
                     ByteBridge bridge = ctx.outByteBridge;
                     if (bridge == null) {
                         bridge = new ByteBridge(ctx);
@@ -865,33 +771,19 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
                 throw new IllegalStateException("nextOutboundByteBuffer() called from outside the eventLoop");
             }
             ctx = ctx.prev;
-
-            if (ctx == null) {
-                if (initialCtx != null && initialCtx.next != null) {
-                    throw new NoSuchBufferException(String.format(
-                            "the handler '%s' could not find a %s whose outbound buffer is %s.",
-                            initialCtx.next.name(), ChannelOutboundHandler.class.getSimpleName(),
-                            ByteBuf.class.getSimpleName()));
-                } else {
-                    throw new NoSuchBufferException(String.format(
-                            "the pipeline does not contain a %s whose outbound buffer is %s.",
-                            ChannelOutboundHandler.class.getSimpleName(),
-                            ByteBuf.class.getSimpleName()));
-                }
-            }
         }
     }
 
     @Override
     public MessageBuf nextOutboundMessageBuffer() {
         DefaultChannelHandlerContext ctx = prev;
-        final DefaultChannelHandlerContext initialCtx = ctx;
         for (;;) {
             if (ctx.hasOutboundMessageBuffer()) {
-                if (ctx.executor().inEventLoop()) {
+                Thread currentThread = Thread.currentThread();
+                if (ctx.executor().inEventLoop(currentThread)) {
                     return ctx.outboundMessageBuffer();
                 }
-                if (executor().inEventLoop()) {
+                if (executor().inEventLoop(currentThread)) {
                     MessageBridge bridge = ctx.outMsgBridge;
                     if (bridge == null) {
                         bridge = new MessageBridge();
@@ -904,43 +796,27 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
                 throw new IllegalStateException("nextOutboundMessageBuffer() called from outside the eventLoop");
             }
             ctx = ctx.prev;
-
-            if (ctx == null) {
-                if (initialCtx.next != null) {
-                    throw new NoSuchBufferException(String.format(
-                            "the handler '%s' could not find a %s whose outbound buffer is %s.",
-                            initialCtx.next.name(), ChannelOutboundHandler.class.getSimpleName(),
-                            MessageBuf.class.getSimpleName()));
-                } else {
-                    throw new NoSuchBufferException(String.format(
-                            "the pipeline does not contain a %s whose outbound buffer is %s.",
-                            ChannelOutboundHandler.class.getSimpleName(),
-                            MessageBuf.class.getSimpleName()));
-                }
-            }
         }
     }
 
     @Override
     public void fireChannelRegistered() {
-        lazyInitOutboundBuffer();
+        lazyInitHeadHandler();
         final DefaultChannelHandlerContext next = findContextInbound();
-        if (next != null) {
-            EventExecutor executor = next.executor();
-            if (executor.inEventLoop()) {
-                next.invokeChannelRegistered();
-            } else {
-                Runnable task = next.invokeChannelRegisteredTask;
-                if (task == null) {
-                    next.invokeChannelRegisteredTask = task = new Runnable() {
-                        @Override
-                        public void run() {
-                            next.invokeChannelRegistered();
-                        }
-                    };
-                }
-                executor.execute(task);
+        EventExecutor executor = next.executor();
+        if (executor.inEventLoop()) {
+            next.invokeChannelRegistered();
+        } else {
+            Runnable task = next.invokeChannelRegisteredTask;
+            if (task == null) {
+                next.invokeChannelRegisteredTask = task = new Runnable() {
+                    @Override
+                    public void run() {
+                        next.invokeChannelRegistered();
+                    }
+                };
             }
+            executor.execute(task);
         }
     }
 
@@ -957,22 +833,20 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
     @Override
     public void fireChannelUnregistered() {
         final DefaultChannelHandlerContext next = findContextInbound();
-        if (next != null) {
-            EventExecutor executor = next.executor();
-            if (prev != null && executor.inEventLoop()) {
-                next.invokeChannelUnregistered();
-            } else {
-                Runnable task = next.invokeChannelUnregisteredTask;
-                if (task == null) {
-                    next.invokeChannelUnregisteredTask = task = new Runnable() {
-                        @Override
-                        public void run() {
-                            next.invokeChannelUnregistered();
-                        }
-                    };
-                }
-                executor.execute(task);
+        EventExecutor executor = next.executor();
+        if (prev != null && executor.inEventLoop()) {
+            next.invokeChannelUnregistered();
+        } else {
+            Runnable task = next.invokeChannelUnregisteredTask;
+            if (task == null) {
+                next.invokeChannelUnregisteredTask = task = new Runnable() {
+                    @Override
+                    public void run() {
+                        next.invokeChannelUnregistered();
+                    }
+                };
             }
+            executor.execute(task);
         }
     }
 
@@ -986,23 +860,22 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
 
     @Override
     public void fireChannelActive() {
+        lazyInitHeadHandler();
         final DefaultChannelHandlerContext next = findContextInbound();
-        if (next != null) {
-            EventExecutor executor = next.executor();
-            if (executor.inEventLoop()) {
-                next.invokeChannelActive();
-            } else {
-                Runnable task = next.invokeChannelActiveTask;
-                if (task == null) {
-                    next.invokeChannelActiveTask = task = new Runnable() {
-                        @Override
-                        public void run() {
-                            next.invokeChannelActive();
-                        }
-                    };
-                }
-                executor.execute(task);
+        EventExecutor executor = next.executor();
+        if (executor.inEventLoop()) {
+            next.invokeChannelActive();
+        } else {
+            Runnable task = next.invokeChannelActiveTask;
+            if (task == null) {
+                next.invokeChannelActiveTask = task = new Runnable() {
+                    @Override
+                    public void run() {
+                        next.invokeChannelActive();
+                    }
+                };
             }
+            executor.execute(task);
         }
     }
 
@@ -1019,22 +892,20 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
     @Override
     public void fireChannelInactive() {
         final DefaultChannelHandlerContext next = findContextInbound();
-        if (next != null) {
-            EventExecutor executor = next.executor();
-            if (prev != null && executor.inEventLoop()) {
-                next.invokeChannelInactive();
-            } else {
-                Runnable task = next.invokeChannelInactiveTask;
-                if (task == null) {
-                    next.invokeChannelInactiveTask = task = new Runnable() {
-                        @Override
-                        public void run() {
-                            next.invokeChannelInactive();
-                        }
-                    };
-                }
-                executor.execute(task);
+        EventExecutor executor = next.executor();
+        if (prev != null && executor.inEventLoop()) {
+            next.invokeChannelInactive();
+        } else {
+            Runnable task = next.invokeChannelInactiveTask;
+            if (task == null) {
+                next.invokeChannelInactiveTask = task = new Runnable() {
+                    @Override
+                    public void run() {
+                        next.invokeChannelInactive();
+                    }
+                };
             }
+            executor.execute(task);
         }
     }
 
@@ -1055,30 +926,23 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
         }
 
         final DefaultChannelHandlerContext next = this.next;
-        if (next != null) {
-            EventExecutor executor = next.executor();
-            if (prev != null && executor.inEventLoop()) {
-                next.invokeExceptionCaught(cause);
-            } else {
-                try {
-                    executor.execute(new Runnable() {
-                        @Override
-                        public void run() {
-                            next.invokeExceptionCaught(cause);
-                        }
-                    });
-                } catch (Throwable t) {
-                    if (logger.isWarnEnabled()) {
-                        logger.warn("Failed to submit an exceptionCaught() event.", t);
-                        logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
+        EventExecutor executor = next.executor();
+        if (prev != null && executor.inEventLoop()) {
+            next.invokeExceptionCaught(cause);
+        } else {
+            try {
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        next.invokeExceptionCaught(cause);
                     }
+                });
+            } catch (Throwable t) {
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Failed to submit an exceptionCaught() event.", t);
+                    logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
                 }
             }
-        } else {
-            logger.warn(
-                    "An exceptionCaught() event was fired, and it reached at the end of the " +
-                            "pipeline.  It usually means the last inbound handler in the pipeline did not " +
-                            "handle the exception.", cause);
         }
     }
 
@@ -1103,18 +967,16 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
         }
 
         final DefaultChannelHandlerContext next = this.next;
-        if (next != null) {
-            EventExecutor executor = next.executor();
-            if (executor.inEventLoop()) {
-                next.invokeUserEventTriggered(event);
-            } else {
-                executor.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        next.invokeUserEventTriggered(event);
-                    }
-                });
-            }
+        EventExecutor executor = next.executor();
+        if (executor.inEventLoop()) {
+            next.invokeUserEventTriggered(event);
+        } else {
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    next.invokeUserEventTriggered(event);
+                }
+            });
         }
     }
 
@@ -1149,7 +1011,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
 
     private void fireInboundBufferUpdated0() {
         final DefaultChannelHandlerContext next = findContextInbound();
-        if (next != null && !next.isInboundBufferFreed()) {
+        if (!next.isInboundBufferFreed()) {
             next.fillBridge();
             // This comparison is safe because this method is always executed from the executor.
             if (next.executor == executor) {
@@ -1191,28 +1053,26 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
     }
 
     @Override
-    public void fireInboundBufferSuspended() {
+    public void fireChannelReadSuspended() {
         final DefaultChannelHandlerContext next = findContextInbound();
-        if (next != null) {
-            EventExecutor executor = next.executor();
-            if (prev != null && executor.inEventLoop()) {
-                next.invokeInboundBufferSuspended();
-            } else {
-                Runnable task = next.invokeInboundBufferSuspendedTask;
-                if (task == null) {
-                    next.invokeInboundBufferSuspendedTask = task = new Runnable() {
-                        @Override
-                        public void run() {
-                            next.invokeInboundBufferSuspended();
-                        }
-                    };
-                }
-                executor.execute(task);
+        EventExecutor executor = next.executor();
+        if (prev != null && executor.inEventLoop()) {
+            next.invokeChannelReadSuspended();
+        } else {
+            Runnable task = next.invokeChannelReadSuspendedTask;
+            if (task == null) {
+                next.invokeChannelReadSuspendedTask = task = new Runnable() {
+                    @Override
+                    public void run() {
+                        next.invokeChannelReadSuspended();
+                    }
+                };
             }
+            executor.execute(task);
         }
     }
 
-    private void invokeInboundBufferSuspended() {
+    private void invokeChannelReadSuspended() {
         try {
             ((ChannelStateHandler) handler()).channelReadSuspended(this);
         } catch (Throwable t) {
@@ -1701,12 +1561,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
             }
         }
 
-        DefaultChannelHandlerContext nextCtx = findContextInbound();
-        if (nextCtx != null) {
+        if (next != null) {
+            DefaultChannelHandlerContext nextCtx = findContextInbound();
             nextCtx.invokeFreeInboundBuffer();
         } else {
             // Freed all inbound buffers. Free all outbound buffers in a reverse order.
-            pipeline.tail.findContextOutbound().invokeFreeOutboundBuffer();
+            findContextOutbound().invokeFreeOutboundBuffer();
         }
     }
 
@@ -1741,9 +1601,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
             }
         }
 
-        DefaultChannelHandlerContext nextCtx = findContextOutbound();
-        if (nextCtx != null) {
-            nextCtx.invokeFreeOutboundBuffer();
+        if (prev != null) {
+            findContextOutbound().invokeFreeOutboundBuffer();
         }
     }
 
@@ -1790,7 +1649,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
         DefaultChannelHandlerContext ctx = this;
         do {
             ctx = ctx.next;
-        } while (ctx != null && !(ctx.handler() instanceof ChannelStateHandler));
+        } while (!(ctx.handler() instanceof ChannelStateHandler));
         return ctx;
     }
 
@@ -1798,7 +1657,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
         DefaultChannelHandlerContext ctx = this;
         do {
             ctx = ctx.prev;
-        } while (ctx != null && !(ctx.handler() instanceof ChannelOperationHandler));
+        } while (!(ctx.handler() instanceof ChannelOperationHandler));
         return ctx;
     }
 
diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
index 465fa6fe34..8dc1f70927 100755
--- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
+++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
@@ -15,6 +15,7 @@
  */
 package io.netty.channel;
 
+import io.netty.buffer.Buf;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Freeable;
 import io.netty.buffer.MessageBuf;
@@ -56,7 +57,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
     final Map childExecutors =
             new IdentityHashMap();
 
-    private static final TailHandler TAIL_HANDLER = new TailHandler();
     volatile boolean inboundBufferFreed;
     volatile boolean outboundBufferFreed;
 
@@ -66,7 +66,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
         }
         this.channel = channel;
 
-        tail = new DefaultChannelHandlerContext(this, null, generateName(TAIL_HANDLER), TAIL_HANDLER);
+        TailHandler tailHandler = new TailHandler();
+        tail = new DefaultChannelHandlerContext(this, generateName(tailHandler), tailHandler);
 
         HeadHandler headHandler;
         switch (channel.metadata().bufferType()) {
@@ -80,7 +81,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
             throw new Error("unknown buffer type: " + channel.metadata().bufferType());
         }
 
-        head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler, true);
+        head = new DefaultChannelHandlerContext(this, generateName(headHandler), headHandler);
 
         head.next = tail;
         tail.prev = head;
@@ -583,8 +584,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
 
     private static void callBeforeAdd(ChannelHandlerContext ctx) {
         ChannelHandler handler = ctx.handler();
-        if (handler instanceof ChannelStateHandlerAdapter) {
-            ChannelStateHandlerAdapter h = (ChannelStateHandlerAdapter) handler;
+        if (handler instanceof ChannelHandlerAdapter) {
+            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
             if (!h.isSharable() && h.added) {
                 throw new ChannelPipelineException(
                         h.getClass().getName()  +
@@ -904,8 +905,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
     }
 
     @Override
-    public void fireInboundBufferSuspended() {
-        head.fireInboundBufferSuspended();
+    public void fireChannelReadSuspended() {
+        head.fireChannelReadSuspended();
         if (channel.config().isAutoRead()) {
             read();
         }
@@ -1075,22 +1076,110 @@ final class DefaultChannelPipeline implements ChannelPipeline {
         }
     }
 
-    private static final class TailHandler extends ChannelInboundMessageHandlerAdapter {
-        public TailHandler() {
-            super(Freeable.class);
+    // A special catch-all handler that handles both bytes and messages.
+    static final class TailHandler implements ChannelInboundHandler {
+
+        final ByteBuf byteSink = Unpooled.buffer(0);
+        final MessageBuf msgSink = Unpooled.messageBuffer(0);
+
+        @Override
+        public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
+
+        @Override
+        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
+
+        @Override
+        public void channelActive(ChannelHandlerContext ctx) throws Exception { }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
+
+        @Override
+        public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception { }
+
+        @Override
+        public void beforeAdd(ChannelHandlerContext ctx) throws Exception { }
+
+        @Override
+        public void afterAdd(ChannelHandlerContext ctx) throws Exception { }
+
+        @Override
+        public void beforeRemove(ChannelHandlerContext ctx) throws Exception { }
+
+        @Override
+        public void afterRemove(ChannelHandlerContext ctx) throws Exception { }
+
+        @Override
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+            logger.warn(
+                    "An exceptionCaught() event was fired, and it reached at the end of the pipeline. " +
+                            "It usually means the last handler in the pipeline did not handle the exception.", cause);
         }
 
         @Override
-        protected void messageReceived(ChannelHandlerContext ctx, Freeable msg) throws Exception {
-            if (logger.isWarnEnabled()) {
-                logger.warn("Freeable reached end-of-pipeline, call " + msg + ".free() to" +
-                        " guard against resource leakage!");
+        public Buf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
+            throw new Error();
+        }
+
+        @Override
+        public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
+            byteSink.free();
+            msgSink.free();
+        }
+
+        @Override
+        public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
+            int byteSinkSize = byteSink.readableBytes();
+            if (byteSinkSize != 0) {
+                byteSink.clear();
+                logger.warn(
+                        "Discarded {} inbound byte(s) that reached at the end of the pipeline. " +
+                        "Please check your pipeline configuration.", byteSinkSize);
+            }
+
+            int msgSinkSize = msgSink.size();
+            if (msgSinkSize != 0) {
+                MessageBuf in = msgSink;
+                for (;;) {
+                    Object m = in.poll();
+                    if (m == null) {
+                        break;
+                    }
+
+                    if (m instanceof Freeable) {
+                        ((Freeable) m).free();
+                    }
+                }
+                logger.warn(
+                        "Discarded {} inbound message(s) that reached at the end of the pipeline. " +
+                        "Please check your pipeline configuration.", msgSinkSize);
             }
-            msg.free();
         }
     }
 
-    private abstract class HeadHandler implements ChannelOutboundHandler {
+    abstract class HeadHandler implements ChannelOutboundHandler {
+
+        ByteBuf byteSink;
+        MessageBuf msgSink;
+
+        void init(ChannelHandlerContext ctx) {
+            switch (ctx.channel().metadata().bufferType()) {
+            case BYTE:
+                byteSink = ctx.alloc().ioBuffer();
+                msgSink = Unpooled.messageBuffer(0);
+                break;
+            case MESSAGE:
+                byteSink = Unpooled.buffer(0);
+                msgSink = Unpooled.messageBuffer();
+                break;
+            default:
+                throw new Error();
+            }
+        }
+
         @Override
         public final void beforeAdd(ChannelHandlerContext ctx) throws Exception {
             // NOOP
@@ -1146,11 +1235,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
             unsafe.beginRead();
         }
 
-        @Override
-        public final void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
-            unsafe.flush(promise);
-        }
-
         @Override
         public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
             ctx.fireExceptionCaught(cause);
@@ -1166,36 +1250,54 @@ final class DefaultChannelPipeline implements ChannelPipeline {
                 ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise) throws Exception {
             unsafe.sendFile(region, promise);
         }
-    }
 
-    private final class ByteHeadHandler extends HeadHandler implements ChannelOutboundByteHandler {
         @Override
-        public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
-            return ctx.alloc().ioBuffer();
+        public final Buf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
+            throw new Error();
         }
 
         @Override
-        public void discardOutboundReadBytes(ChannelHandlerContext ctx) throws Exception {
-            if (ctx.hasOutboundByteBuffer()) {
-                ctx.outboundByteBuffer().discardSomeReadBytes();
+        public final void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
+            msgSink.free();
+            byteSink.free();
+        }
+    }
+
+    private final class ByteHeadHandler extends HeadHandler {
+        @Override
+        public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+            int msgSinkSize = msgSink.size();
+            if (msgSinkSize != 0) {
+                MessageBuf in = msgSink;
+                for (;;) {
+                    Object m = in.poll();
+                    if (m == null) {
+                        break;
+                    }
+
+                    if (m instanceof Freeable) {
+                        ((Freeable) m).free();
+                    }
+                }
+                logger.warn(
+                        "Discarded {} outbound message(s) that reached at the end of the pipeline. " +
+                                "Please check your pipeline configuration.", msgSinkSize);
             }
-        }
-
-        @Override
-        public void freeOutboundBuffer(ChannelHandlerContext ctx) {
-            ctx.outboundByteBuffer().free();
+            unsafe.flush(promise);
         }
     }
 
-    private final class MessageHeadHandler extends HeadHandler implements ChannelOutboundMessageHandler {
+    private final class MessageHeadHandler extends HeadHandler {
         @Override
-        public MessageBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
-            return Unpooled.messageBuffer();
-        }
-
-        @Override
-        public void freeOutboundBuffer(ChannelHandlerContext ctx) {
-            ctx.outboundMessageBuffer().free();
+        public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+            int byteSinkSize = byteSink.readableBytes();
+            if (byteSinkSize != 0) {
+                byteSink.clear();
+                logger.warn(
+                        "Discarded {} outbound byte(s) that reached at the end of the pipeline. " +
+                                "Please check your pipeline configuration.", byteSinkSize);
+            }
+            unsafe.flush(promise);
         }
     }
 }
diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java
index bf159b2630..ce5ff157a0 100755
--- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java
+++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java
@@ -222,7 +222,7 @@ public class LocalChannel extends AbstractChannel {
         }
 
         pipeline.fireInboundBufferUpdated();
-        pipeline.fireInboundBufferSuspended();
+        pipeline.fireChannelReadSuspended();
     }
 
     @Override
@@ -259,7 +259,7 @@ public class LocalChannel extends AbstractChannel {
         if (peer.readInProgress) {
             peer.readInProgress = false;
             peerPipeline.fireInboundBufferUpdated();
-            peerPipeline.fireInboundBufferSuspended();
+            peerPipeline.fireChannelReadSuspended();
         }
     }
 
diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java
index 04fa728c05..70d21f6f27 100755
--- a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java
+++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java
@@ -144,7 +144,7 @@ public class LocalServerChannel extends AbstractServerChannel {
         }
 
         pipeline.fireInboundBufferUpdated();
-        pipeline.fireInboundBufferSuspended();
+        pipeline.fireChannelReadSuspended();
     }
 
     LocalChannel serve(final LocalChannel peer) {
@@ -160,7 +160,7 @@ public class LocalServerChannel extends AbstractServerChannel {
             if (acceptInProgress) {
                 acceptInProgress = false;
                 pipeline.fireInboundBufferUpdated();
-                pipeline.fireInboundBufferSuspended();
+                pipeline.fireChannelReadSuspended();
             }
         } else {
             eventLoop().execute(new Runnable() {
diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java
index 0ce1f388ef..a987f3eb64 100755
--- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java
+++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java
@@ -65,7 +65,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
             final ByteBuf byteBuf = pipeline.inboundByteBuffer();
             boolean closed = false;
             boolean read = false;
-            boolean firedInboundBufferSuspended = false;
+            boolean firedChannelReadSuspended = false;
             try {
                 expandReadBuffer(byteBuf);
                 loop: for (;;) {
@@ -106,8 +106,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
                 if (t instanceof IOException) {
                     closed = true;
                 } else if (!closed) {
-                    firedInboundBufferSuspended = true;
-                    pipeline.fireInboundBufferSuspended();
+                    firedChannelReadSuspended = true;
+                    pipeline.fireChannelReadSuspended();
                 }
                 pipeline().fireExceptionCaught(t);
             } finally {
@@ -124,8 +124,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
                             close(voidFuture());
                         }
                     }
-                } else if (!firedInboundBufferSuspended) {
-                    pipeline.fireInboundBufferSuspended();
+                } else if (!firedChannelReadSuspended) {
+                    pipeline.fireChannelReadSuspended();
                 }
             }
         }
diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java
index c5117d0d67..7afe047214 100755
--- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java
+++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java
@@ -55,7 +55,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
             final MessageBuf msgBuf = pipeline.inboundMessageBuffer();
             boolean closed = false;
             boolean read = false;
-            boolean firedInboundBufferSuspended = false;
+            boolean firedChannelReadSuspended = false;
             try {
                 for (;;) {
                     int localReadAmount = doReadMessages(msgBuf);
@@ -77,8 +77,8 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
                 if (t instanceof IOException) {
                     closed = true;
                 } else if (!closed) {
-                    firedInboundBufferSuspended = true;
-                    pipeline.fireInboundBufferSuspended();
+                    firedChannelReadSuspended = true;
+                    pipeline.fireChannelReadSuspended();
                 }
 
                 pipeline().fireExceptionCaught(t);
@@ -88,8 +88,8 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
                 }
                 if (closed && isOpen()) {
                     close(voidFuture());
-                } else if (!firedInboundBufferSuspended) {
-                    pipeline.fireInboundBufferSuspended();
+                } else if (!firedChannelReadSuspended) {
+                    pipeline.fireChannelReadSuspended();
                 }
             }
         }
diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java
index 8992048db0..933a657b24 100755
--- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java
+++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java
@@ -126,7 +126,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
                 pipeline.fireExceptionCaught(t);
             } else {
                 firedInboundBufferSuspeneded = true;
-                pipeline.fireInboundBufferSuspended();
+                pipeline.fireChannelReadSuspended();
                 pipeline.fireExceptionCaught(t);
                 unsafe().close(unsafe().voidFuture());
             }
@@ -144,7 +144,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
                     }
                 }
             } else if (!firedInboundBufferSuspeneded) {
-                pipeline.fireInboundBufferSuspended();
+                pipeline.fireChannelReadSuspended();
             }
         }
     }
diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java
index cd55ecf0e9..f231f46871 100755
--- a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java
+++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java
@@ -39,7 +39,7 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
         final MessageBuf msgBuf = pipeline.inboundMessageBuffer();
         boolean closed = false;
         boolean read = false;
-        boolean firedInboundBufferSuspended = false;
+        boolean firedChannelReadSuspended = false;
         try {
             int localReadAmount = doReadMessages(msgBuf);
             if (localReadAmount > 0) {
@@ -52,8 +52,8 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
                 read = false;
                 pipeline.fireInboundBufferUpdated();
             }
-            firedInboundBufferSuspended = true;
-            pipeline.fireInboundBufferSuspended();
+            firedChannelReadSuspended = true;
+            pipeline.fireChannelReadSuspended();
             pipeline.fireExceptionCaught(t);
             if (t instanceof IOException) {
                 unsafe().close(unsafe().voidFuture());
@@ -62,8 +62,8 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
             if (read) {
                 pipeline.fireInboundBufferUpdated();
             }
-            if (!firedInboundBufferSuspended) {
-                pipeline.fireInboundBufferSuspended();
+            if (!firedChannelReadSuspended) {
+                pipeline.fireChannelReadSuspended();
             }
             if (closed && isOpen()) {
                 unsafe().close(unsafe().voidFuture());
diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java
index cb0cbb2b11..efcdfe146d 100755
--- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java
+++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java
@@ -185,7 +185,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
             channel.pipeline().inboundMessageBuffer().add(
                     new AioSocketChannel(channel, null, ch));
             channel.pipeline().fireInboundBufferUpdated();
-            channel.pipeline().fireInboundBufferSuspended();
+            channel.pipeline().fireChannelReadSuspended();
         }
 
         @Override
diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java
index 519d18a267..0ed1ccba4e 100755
--- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java
+++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java
@@ -436,7 +436,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
 
             boolean closed = false;
             boolean read = false;
-            boolean firedInboundBufferSuspended = false;
+            boolean firedChannelReadSuspended = false;
             try {
                 int localReadAmount = result.intValue();
                 if (localReadAmount > 0) {
@@ -458,8 +458,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
                 }
 
                 if (!closed && channel.isOpen()) {
-                    firedInboundBufferSuspended = true;
-                    pipeline.fireInboundBufferSuspended();
+                    firedChannelReadSuspended = true;
+                    pipeline.fireChannelReadSuspended();
                 }
 
                 pipeline.fireExceptionCaught(t);
@@ -478,8 +478,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
                             channel.unsafe().close(channel.unsafe().voidFuture());
                         }
                     }
-                } else if (!firedInboundBufferSuspended) {
-                    pipeline.fireInboundBufferSuspended();
+                } else if (!firedChannelReadSuspended) {
+                    pipeline.fireChannelReadSuspended();
                 }
             }
         }
diff --git a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java
index 820fd3b002..b350d8988c 100644
--- a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java
+++ b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java
@@ -59,11 +59,11 @@ public abstract class AbstractEventLoopTest {
         assertSame(executor, future.channel().pipeline().context(TestChannelHandler2.class).executor());
     }
 
-    private static final class TestChannelHandler extends ChannelHandlerAdapter {
+    private static final class TestChannelHandler extends ChannelDuplexHandler {
 
     }
 
-    private static final class TestChannelHandler2 extends ChannelHandlerAdapter {
+    private static final class TestChannelHandler2 extends ChannelDuplexHandler {
 
     }
 
diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java
index 7a9a3f4041..10a03c30d2 100644
--- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java
+++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java
@@ -16,6 +16,7 @@
 package io.netty.channel;
 
 
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Freeable;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.local.LocalChannel;
@@ -24,10 +25,98 @@ import org.junit.Test;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.*;
 
 public class DefaultChannelPipelineTest {
+    @Test
+    public void testMessageCatchAllInboundSink() throws Exception {
+        LocalChannel channel = new LocalChannel();
+        LocalEventLoopGroup group = new LocalEventLoopGroup();
+        group.register(channel).awaitUninterruptibly();
+        final AtomicBoolean forwarded = new AtomicBoolean();
+        final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
+        pipeline.addLast(new ChannelInboundMessageHandlerAdapter() {
+            @Override
+            protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
+                forwarded.set(ctx.nextInboundMessageBuffer().add(msg));
+            }
+
+            @Override
+            protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception {
+                ctx.fireInboundBufferUpdated();
+            }
+        });
+        channel.eventLoop().submit(new Runnable() {
+            @Override
+            public void run() {
+                pipeline.fireChannelActive();
+                pipeline.inboundMessageBuffer().add(new Object());
+                pipeline.fireInboundBufferUpdated();
+            }
+        }).get();
+
+        assertTrue(forwarded.get());
+    }
+
+    @Test
+    public void testByteCatchAllInboundSink() throws Exception {
+        LocalChannel channel = new LocalChannel();
+        LocalEventLoopGroup group = new LocalEventLoopGroup();
+        group.register(channel).awaitUninterruptibly();
+        final AtomicBoolean forwarded = new AtomicBoolean();
+        final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
+        pipeline.addLast(new ChannelInboundByteHandlerAdapter() {
+            @Override
+            protected void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+                ByteBuf out = ctx.nextInboundByteBuffer();
+                out.writeBytes(in);
+                forwarded.set(true);
+                ctx.fireInboundBufferUpdated();
+            }
+        });
+        channel.eventLoop().submit(new Runnable() {
+            @Override
+            public void run() {
+                pipeline.fireChannelActive();
+                pipeline.inboundByteBuffer().writeByte(0);
+                pipeline.fireInboundBufferUpdated();
+            }
+        }).get();
+
+        assertTrue(forwarded.get());
+    }
+
+    @Test
+    public void testByteCatchAllOutboundSink() throws Exception {
+        LocalChannel channel = new LocalChannel();
+        LocalEventLoopGroup group = new LocalEventLoopGroup();
+        group.register(channel).awaitUninterruptibly();
+        final AtomicBoolean forwarded = new AtomicBoolean();
+        final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
+        pipeline.addLast(new ChannelOutboundByteHandlerAdapter() {
+            @Override
+            protected void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception {
+                ByteBuf out = ctx.nextOutboundByteBuffer();
+                out.writeBytes(in);
+                forwarded.set(true);
+                ctx.flush(promise);
+            }
+        });
+        channel.eventLoop().submit(new Runnable() {
+            @Override
+            public void run() {
+                pipeline.fireChannelActive();
+                pipeline.outboundByteBuffer().writeByte(0);
+                pipeline.flush();
+            }
+        }).get();
+
+        Thread.sleep(1000);
+        assertTrue(forwarded.get());
+    }
+
     @Test
     public void testFreeCalled() throws InterruptedException{
         final CountDownLatch free = new CountDownLatch(1);
@@ -242,7 +331,7 @@ public class DefaultChannelPipelineTest {
     }
 
     @Sharable
-    private static class TestHandler extends ChannelHandlerAdapter {
+    private static class TestHandler extends ChannelDuplexHandler {
         // Dummy
     }
 }
diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java
index 1b2fe82d7d..9689c414f8 100644
--- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java
+++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java
@@ -20,7 +20,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.MessageBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerAdapter;
+import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundByteHandler;
 import io.netty.channel.ChannelInboundMessageHandler;
@@ -331,7 +331,7 @@ public class LocalTransportThreadModelTest {
     }
 
     private static class ThreadNameAuditor
-            extends ChannelHandlerAdapter
+            extends ChannelDuplexHandler
             implements ChannelInboundMessageHandler,
                        ChannelOutboundMessageHandler {
 
@@ -394,7 +394,7 @@ public class LocalTransportThreadModelTest {
      * Converts integers into a binary stream.
      */
     private static class MessageForwarder1
-            extends ChannelHandlerAdapter
+            extends ChannelDuplexHandler
             implements ChannelInboundMessageHandler, ChannelOutboundByteHandler {
 
         private final AtomicReference exception = new AtomicReference();
@@ -502,7 +502,7 @@ public class LocalTransportThreadModelTest {
      * Converts a binary stream into integers.
      */
     private static class MessageForwarder2
-            extends ChannelHandlerAdapter
+            extends ChannelDuplexHandler
             implements ChannelInboundByteHandler, ChannelOutboundMessageHandler {
 
         private final AtomicReference exception = new AtomicReference();
@@ -605,7 +605,7 @@ public class LocalTransportThreadModelTest {
      * Simply forwards the received object to the next handler.
      */
     private static class MessageForwarder3
-            extends ChannelHandlerAdapter
+            extends ChannelDuplexHandler
             implements ChannelInboundMessageHandler, ChannelOutboundMessageHandler {
 
         private final AtomicReference exception = new AtomicReference();
@@ -695,7 +695,7 @@ public class LocalTransportThreadModelTest {
      * Discards all received messages.
      */
     private static class MessageDiscarder
-            extends ChannelHandlerAdapter
+            extends ChannelDuplexHandler
             implements ChannelInboundMessageHandler, ChannelOutboundMessageHandler {
 
         private final AtomicReference exception = new AtomicReference();