From 026715e8189f2bad22c3143eec220594bf18479d Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 29 May 2012 12:09:29 -0700 Subject: [PATCH] Refactor the pipeline API to support stacked codecs - Previous API did not support the pipeline which contains multiple MessageToStreamEncoders because there was no way to find the closest outbound byte buffer. Now you always get the correct buffer even if the handler that provides the buffer is placed distantly. For example: Channel -> MsgAEncoder -> MsgBEncoder -> MsgCEncoder Msg(A|B|C)Encoder will all have access to the channel's outbound byte buffer. Previously, it was simply impossible. - Improved ChannelBufferHolder.toString() --- .../codec/spdy/SpdySessionHandler.java | 8 +- .../spdy/AbstractSocketSpdyEchoTest.java | 2 +- .../codec/MessageToMessageDecoder.java | 4 +- .../codec/MessageToMessageEncoder.java | 20 +--- .../handler/codec/MessageToStreamEncoder.java | 4 +- .../netty/handler/codec/ReplayingDecoder.java | 4 +- .../handler/codec/StreamToMessageDecoder.java | 12 +- .../handler/codec/StreamToStreamDecoder.java | 8 +- .../handler/codec/StreamToStreamEncoder.java | 4 +- .../codec/embedder/DecoderEmbedder.java | 2 +- .../example/discard/DiscardClientHandler.java | 5 +- .../example/discard/DiscardServerHandler.java | 2 +- .../netty/example/echo/EchoClientHandler.java | 4 +- .../netty/example/echo/EchoServerHandler.java | 4 +- .../factorial/FactorialClientHandler.java | 3 +- .../netty/handler/logging/LoggingHandler.java | 4 +- .../io/netty/channel/AbstractChannel.java | 4 +- .../netty/channel/AbstractServerChannel.java | 2 +- .../main/java/io/netty/channel/Channel.java | 2 + .../io/netty/channel/ChannelBufferHolder.java | 33 +++--- .../netty/channel/ChannelHandlerContext.java | 8 ++ .../channel/ChannelInboundHandlerAdapter.java | 12 +- .../channel/ChannelInboundHandlerContext.java | 2 +- .../netty/channel/ChannelInboundInvoker.java | 2 - .../ChannelInboundMessageHandlerAdapter.java | 4 +- .../ChannelOutboundHandlerAdapter.java | 12 +- .../ChannelOutboundHandlerContext.java | 2 +- .../netty/channel/ChannelOutboundInvoker.java | 2 - .../io/netty/channel/ChannelPipeline.java | 3 + .../netty/channel/DefaultChannelPipeline.java | 106 +++++++++++++----- .../netty/channel/ServerChannelBootstrap.java | 2 +- .../socket/nio/AbstractNioMessageChannel.java | 2 +- .../socket/nio/AbstractNioStreamChannel.java | 2 +- .../socket/oio/AbstractOioMessageChannel.java | 2 +- .../socket/oio/AbstractOioStreamChannel.java | 2 +- 35 files changed, 171 insertions(+), 123 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java index 3b5eff09ba..35ef155f40 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java @@ -78,7 +78,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter { @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { - Queue in = ctx.in().messageBuffer(); + Queue in = ctx.inbound().messageBuffer(); for (;;) { Object msg = in.poll(); if (msg == null) { @@ -272,7 +272,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter { } } - ctx.nextIn().messageBuffer().add(msg); + ctx.nextInboundMessageBuffer().add(msg); } @Override @@ -303,7 +303,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter { public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - Queue in = ctx.prevOut().messageBuffer(); + Queue in = ctx.outbound().messageBuffer(); for (;;) { Object msg = in.poll(); if (msg == null) { @@ -394,7 +394,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter { } } - ctx.out().messageBuffer().add(msg); + ctx.nextOutboundMessageBuffer().add(msg); } /* diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java index ba1b38df92..464bbf130e 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java @@ -248,7 +248,7 @@ public abstract class AbstractSocketSpdyEchoTest { @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { - ChannelBuffer m = ctx.in().byteBuffer().readBytes(ctx.in().byteBuffer().readableBytes()); + ChannelBuffer m = ctx.inbound().byteBuffer().readBytes(ctx.inbound().byteBuffer().readableBytes()); byte[] actual = new byte[m.readableBytes()]; m.getBytes(0, actual); diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index 348ff6ad0e..efde1a8a71 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -19,7 +19,7 @@ public abstract class MessageToMessageDecoder extends ChannelInboundHandle @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { - Queue in = ctx.in().messageBuffer(); + Queue in = ctx.inbound().messageBuffer(); boolean decoded = false; for (;;) { try { @@ -35,7 +35,7 @@ public abstract class MessageToMessageDecoder extends ChannelInboundHandle continue; } - if (unfoldAndAdd(ctx, ctx.nextIn(), emsg)) { + if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), emsg)) { decoded = true; } } catch (Throwable t) { diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index 0a53086591..e8da448598 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -1,6 +1,5 @@ package io.netty.handler.codec; -import io.netty.buffer.ChannelBuffer; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelFuture; @@ -20,7 +19,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundHandl @Override public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - Queue in = ctx.prevOut().messageBuffer(); + Queue in = ctx.outbound().messageBuffer(); boolean encoded = false; for (;;) { try { @@ -36,7 +35,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundHandl continue; } - if (unfoldAndAdd(ctx, ctx.out(), emsg)) { + if (unfoldAndAdd(ctx, ctx.nextOutboundMessageBuffer(), emsg)) { encoded = true; } } catch (Throwable t) { @@ -56,7 +55,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundHandl public abstract O encode(ChannelOutboundHandlerContext ctx, I msg) throws Exception; static boolean unfoldAndAdd( - ChannelHandlerContext ctx, ChannelBufferHolder dst, Object msg) throws Exception { + ChannelHandlerContext ctx, Queue dst, Object msg) throws Exception { if (msg == null) { return false; } @@ -94,18 +93,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundHandl return added; } - if (dst.hasMessageBuffer()) { - dst.messageBuffer().add(msg); - } else if (msg instanceof ChannelBuffer) { - ChannelBuffer buf = (ChannelBuffer) msg; - if (!buf.readable()) { - return false; - } - dst.byteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes()); - } else { - throw new UnsupportedMessageTypeException(msg, ChannelBuffer.class); - } - + dst.add(msg); return true; } } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToStreamEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToStreamEncoder.java index 31a19cb538..db2a88d607 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToStreamEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToStreamEncoder.java @@ -19,8 +19,8 @@ public abstract class MessageToStreamEncoder extends ChannelOutboundHandlerAd @Override public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - Queue in = ctx.prevOut().messageBuffer(); - ChannelBuffer out = ctx.out().byteBuffer(); + Queue in = ctx.outbound().messageBuffer(); + ChannelBuffer out = ctx.nextOutboundByteBuffer(); int oldOutSize = out.readableBytes(); for (;;) { diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index 5394c5cd7d..b7607776e3 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -378,7 +378,7 @@ public abstract class ReplayingDecoder> extends StreamToMes } try { - if (unfoldAndAdd(ctx, ctx.nextIn(), decodeLast(ctx, replayable))) { + if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), decodeLast(ctx, replayable))) { fireInboundBufferUpdated(ctx, in); } } catch (Signal replay) { @@ -442,7 +442,7 @@ public abstract class ReplayingDecoder> extends StreamToMes } // A successful decode - if (unfoldAndAdd(ctx, ctx.nextIn(), result)) { + if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), result)) { decoded = true; } } catch (Throwable t) { diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java index 04248ce398..74da6c515b 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java @@ -27,13 +27,13 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda @Override public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception { - ChannelBuffer in = ctx.in().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); if (in.readable()) { callDecode(ctx); } try { - if (unfoldAndAdd(ctx, ctx.nextIn(), decodeLast(ctx, in))) { + if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), decodeLast(ctx, in))) { in.discardReadBytes(); ctx.fireInboundBufferUpdated(); } @@ -49,7 +49,7 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda } protected void callDecode(ChannelInboundHandlerContext ctx) { - ChannelBuffer in = ctx.in().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); boolean decoded = false; for (;;) { @@ -69,7 +69,7 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda } } - if (unfoldAndAdd(ctx, ctx.nextIn(), o)) { + if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), o)) { decoded = true; } else { break; @@ -109,10 +109,10 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda // the new handler. ctx.pipeline().addAfter(ctx.name(), newHandlerName, newHandler); - ChannelBuffer in = ctx.in().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); try { if (in.readable()) { - ctx.nextIn().byteBuffer().writeBytes(ctx.in().byteBuffer()); + ctx.nextInboundByteBuffer().writeBytes(ctx.inbound().byteBuffer()); ctx.fireInboundBufferUpdated(); } } finally { diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToStreamDecoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToStreamDecoder.java index 219b9fea5b..c28d845943 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToStreamDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToStreamDecoder.java @@ -21,12 +21,12 @@ public abstract class StreamToStreamDecoder extends ChannelInboundHandlerAdapter @Override public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception { - ChannelBuffer in = ctx.in().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); if (!in.readable()) { callDecode(ctx); } - ChannelBuffer out = ctx.nextIn().byteBuffer(); + ChannelBuffer out = ctx.nextInboundByteBuffer(); int oldOutSize = out.readableBytes(); try { decodeLast(ctx, in, out); @@ -47,8 +47,8 @@ public abstract class StreamToStreamDecoder extends ChannelInboundHandlerAdapter } private void callDecode(ChannelInboundHandlerContext ctx) { - ChannelBuffer in = ctx.in().byteBuffer(); - ChannelBuffer out = ctx.nextIn().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); + ChannelBuffer out = ctx.nextInboundByteBuffer(); int oldOutSize = out.readableBytes(); while (in.readable()) { diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToStreamEncoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToStreamEncoder.java index 38e4987613..8894a22010 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToStreamEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToStreamEncoder.java @@ -17,8 +17,8 @@ public abstract class StreamToStreamEncoder extends ChannelOutboundHandlerAdapte @Override public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - ChannelBuffer in = ctx.prevOut().byteBuffer(); - ChannelBuffer out = ctx.out().byteBuffer(); + ChannelBuffer in = ctx.outbound().byteBuffer(); + ChannelBuffer out = ctx.nextOutboundByteBuffer(); int oldOutSize = out.readableBytes(); while (in.readable()) { diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java index ec4892bfbb..b38f905b88 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java @@ -57,7 +57,7 @@ public class DecoderEmbedder extends AbstractCodecEmbedder { @Override public boolean offer(Object input) { - ChannelBufferHolder in = pipeline().nextIn(); + ChannelBufferHolder in = pipeline().inbound(); if (in.hasByteBuffer()) { in.byteBuffer().writeBytes((ChannelBuffer) input); } else { diff --git a/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java b/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java index bdcfe1ccd6..fb4f836135 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java +++ b/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java @@ -34,7 +34,6 @@ public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter { private final byte[] content; private ChannelInboundHandlerContext ctx; - private ChannelBuffer out; public DiscardClientHandler(int messageSize) { if (messageSize <= 0) { @@ -49,7 +48,6 @@ public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter { public void channelActive(ChannelInboundHandlerContext ctx) throws Exception { this.ctx = ctx; - out = ctx.out().byteBuffer(); // Send the initial messages. generateTraffic(); } @@ -77,6 +75,7 @@ public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter { private void generateTraffic() { // Fill the outbound buffer up to 64KiB + ChannelBuffer out = ctx.nextOutboundByteBuffer(); while (out.readableBytes() < 65536) { out.writeBytes(content); } @@ -90,7 +89,7 @@ public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { - out.clear(); + ctx.nextOutboundByteBuffer().discardReadBytes(); generateTraffic(); } } diff --git a/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java b/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java index 8c3167013f..d617fe16c9 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java +++ b/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java @@ -34,7 +34,7 @@ public class DiscardServerHandler extends ChannelInboundStreamHandlerAdapter { public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { // Discard the received data silently. - ctx.in().byteBuffer().clear(); + ctx.inbound().byteBuffer().clear(); } diff --git a/example/src/main/java/io/netty/example/echo/EchoClientHandler.java b/example/src/main/java/io/netty/example/echo/EchoClientHandler.java index 56a48d3c29..e84ffc9c56 100644 --- a/example/src/main/java/io/netty/example/echo/EchoClientHandler.java +++ b/example/src/main/java/io/netty/example/echo/EchoClientHandler.java @@ -62,8 +62,8 @@ public class EchoClientHandler extends ChannelInboundHandlerAdapter { @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) { - ChannelBuffer in = ctx.in().byteBuffer(); - ChannelBuffer out = ctx.out().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); + ChannelBuffer out = ctx.nextOutboundByteBuffer(); out.discardReadBytes(); out.writeBytes(in); in.discardReadBytes(); diff --git a/example/src/main/java/io/netty/example/echo/EchoServerHandler.java b/example/src/main/java/io/netty/example/echo/EchoServerHandler.java index 0b388b182b..f24c10408f 100644 --- a/example/src/main/java/io/netty/example/echo/EchoServerHandler.java +++ b/example/src/main/java/io/netty/example/echo/EchoServerHandler.java @@ -39,8 +39,8 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) { - ChannelBuffer in = ctx.in().byteBuffer(); - ChannelBuffer out = ctx.out().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); + ChannelBuffer out = ctx.nextOutboundByteBuffer(); out.discardReadBytes(); out.writeBytes(in); in.discardReadBytes(); diff --git a/example/src/main/java/io/netty/example/factorial/FactorialClientHandler.java b/example/src/main/java/io/netty/example/factorial/FactorialClientHandler.java index 8ec3ddd633..6af06deb23 100644 --- a/example/src/main/java/io/netty/example/factorial/FactorialClientHandler.java +++ b/example/src/main/java/io/netty/example/factorial/FactorialClientHandler.java @@ -40,7 +40,6 @@ public class FactorialClientHandler extends ChannelInboundMessageHandlerAdapter< FactorialClientHandler.class.getName()); private ChannelInboundHandlerContext ctx; - private Queue out; private int i = 1; private int receivedMessages; private final int count; @@ -68,7 +67,6 @@ public class FactorialClientHandler extends ChannelInboundMessageHandlerAdapter< @Override public void channelActive(ChannelInboundHandlerContext ctx) { this.ctx = ctx; - out = ctx.out().messageBuffer(); sendNumbers(); } @@ -101,6 +99,7 @@ public class FactorialClientHandler extends ChannelInboundMessageHandlerAdapter< private void sendNumbers() { // Do not send more than 4096 numbers. boolean finished = false; + Queue out = ctx.nextOutboundMessageBuffer(); while (out.size() < 4096) { if (i <= count) { out.add(Integer.valueOf(i)); diff --git a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java index a25db516c8..18eb54f82e 100644 --- a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java +++ b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java @@ -352,7 +352,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { if (getLogger().isEnabled(internalLevel)) { - logger.log(internalLevel, format(ctx, formatBuffer("INBUF", ctx.in()))); + logger.log(internalLevel, format(ctx, formatBuffer("INBUF", ctx.inbound()))); } ctx.fireInboundBufferUpdated(); } @@ -407,7 +407,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { if (getLogger().isEnabled(internalLevel)) { - logger.log(internalLevel, format(ctx, formatBuffer("OUTBUF", ctx.prevOut()))); + logger.log(internalLevel, format(ctx, formatBuffer("OUTBUF", ctx.outbound()))); } ctx.flush(future); } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 13d6ec0f4a..03d11ee922 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -259,8 +259,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public ChannelBufferHolder out() { - return pipeline().out(); + public ChannelBufferHolder outbound() { + return pipeline().outbound(); } @Override diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index d868b3ec5e..cb30b6a859 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -37,7 +37,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S } @Override - public ChannelBufferHolder out() { + public ChannelBufferHolder outbound() { return ChannelBufferHolders.discardBuffer(); } diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 9d36ae35a0..a1ecc993a0 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -136,6 +136,8 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu boolean isRegistered(); boolean isActive(); + ChannelBufferHolder outbound(); + /** * Returns the local address where this channel is bound to. The returned * {@link SocketAddress} is supposed to be down-cast into more concrete diff --git a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java index 9cd17f6671..018094c35e 100644 --- a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java +++ b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java @@ -59,9 +59,9 @@ public final class ChannelBufferHolder { case 0: return msgBuf != null; case 1: - return ctx.nextIn().hasMessageBuffer(); + return ctx.nextInboundMessageBuffer() != null; case 2: - return ctx.out().hasMessageBuffer(); + return ctx.nextOutboundMessageBuffer() != null; default: throw new Error(); } @@ -72,15 +72,14 @@ public final class ChannelBufferHolder { case 0: return byteBuf != null; case 1: - return ctx.nextIn().hasByteBuffer(); + return ctx.nextInboundByteBuffer() != null; case 2: - return ctx.out().hasByteBuffer(); + return ctx.nextOutboundByteBuffer() != null; default: throw new Error(); } } - @SuppressWarnings("unchecked") public Queue messageBuffer() { switch (bypassDirection) { case 0: @@ -89,9 +88,9 @@ public final class ChannelBufferHolder { } return msgBuf; case 1: - return (Queue) ctx.nextIn().messageBuffer(); + return (Queue) ctx.nextInboundMessageBuffer(); case 2: - return (Queue) ctx.out().messageBuffer(); + return (Queue) ctx.nextOutboundMessageBuffer(); default: throw new Error(); } @@ -105,9 +104,9 @@ public final class ChannelBufferHolder { } return byteBuf; case 1: - return ctx.nextIn().byteBuffer(); + return ctx.nextInboundByteBuffer(); case 2: - return ctx.out().byteBuffer(); + return ctx.nextOutboundByteBuffer(); default: throw new Error(); } @@ -118,14 +117,18 @@ public final class ChannelBufferHolder { switch (bypassDirection) { case 0: if (msgBuf != null) { - return msgBuf.toString(); + if (byteBuf != null) { + return "CatchAllBuffer"; + } else { + return "MessageBuffer(" + msgBuf.size() + ')'; + } } else { return byteBuf.toString(); } case 1: - return ctx.nextIn().toString(); + return "InboundBypassBuffer"; case 2: - return ctx.out().toString(); + return "OutboundBypassBuffer"; default: throw new Error(); } @@ -140,9 +143,8 @@ public final class ChannelBufferHolder { return byteBuf.readableBytes(); } case 1: - return ctx.nextIn().size(); case 2: - return ctx.out().size(); + throw new UnsupportedOperationException(); default: throw new Error(); } @@ -157,9 +159,8 @@ public final class ChannelBufferHolder { return byteBuf.readable(); } case 1: - return ctx.nextIn().isEmpty(); case 2: - return ctx.out().isEmpty(); + throw new UnsupportedOperationException(); default: throw new Error(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index 581d4ac832..16d7de2b1a 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -16,9 +16,11 @@ package io.netty.channel; +import io.netty.buffer.ChannelBuffer; import io.netty.util.AttributeMap; import java.nio.channels.Channels; +import java.util.Queue; /** * Enables a {@link ChannelHandler} to interact with its {@link ChannelPipeline} @@ -133,4 +135,10 @@ public interface ChannelHandlerContext boolean canHandleInbound(); boolean canHandleOutbound(); + + ChannelBuffer nextInboundByteBuffer(); + Queue nextInboundMessageBuffer(); + + ChannelBuffer nextOutboundByteBuffer(); + Queue nextOutboundMessageBuffer(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java index a95eb9e8ca..c0387e9f3d 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java @@ -61,14 +61,14 @@ public abstract class ChannelInboundHandlerAdapter implements ChannelInboundH } static void inboundBufferUpdated0(ChannelInboundHandlerContext ctx) { - if (ctx.in().isBypass()) { + if (ctx.inbound().isBypass()) { ctx.fireInboundBufferUpdated(); return; } - if (ctx.in().hasMessageBuffer()) { - Queue in = ctx.in().messageBuffer(); - Queue nextIn = ctx.nextIn().messageBuffer(); + if (ctx.inbound().hasMessageBuffer()) { + Queue in = ctx.inbound().messageBuffer(); + Queue nextIn = ctx.nextInboundMessageBuffer(); for (;;) { I msg = in.poll(); if (msg == null) { @@ -77,8 +77,8 @@ public abstract class ChannelInboundHandlerAdapter implements ChannelInboundH nextIn.add(msg); } } else { - ChannelBuffer in = ctx.in().byteBuffer(); - ChannelBuffer nextIn = ctx.nextIn().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); + ChannelBuffer nextIn = ctx.nextInboundByteBuffer(); nextIn.writeBytes(in); in.discardReadBytes(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerContext.java index 8657a23461..7537a3529b 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerContext.java @@ -1,5 +1,5 @@ package io.netty.channel; public interface ChannelInboundHandlerContext extends ChannelHandlerContext { - ChannelBufferHolder in(); + ChannelBufferHolder inbound(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java index 993b80ad91..82e1fbe701 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java @@ -2,8 +2,6 @@ package io.netty.channel; public interface ChannelInboundInvoker { - ChannelBufferHolder nextIn(); - void fireChannelRegistered(); void fireChannelUnregistered(); void fireChannelActive(); diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java index 4b56a0f919..c103677c2d 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java @@ -14,7 +14,7 @@ public class ChannelInboundMessageHandlerAdapter extends @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { - Queue in = ctx.in().messageBuffer(); + Queue in = ctx.inbound().messageBuffer(); for (;;) { I msg = in.poll(); if (msg == null) { @@ -30,6 +30,6 @@ public class ChannelInboundMessageHandlerAdapter extends } public void messageReceived(ChannelInboundHandlerContext ctx, I msg) throws Exception { - ctx.nextIn().messageBuffer().add(msg); + ctx.nextInboundMessageBuffer().add(msg); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java index 794c6e45ab..784721364b 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java @@ -57,14 +57,14 @@ public abstract class ChannelOutboundHandlerAdapter implements ChannelOutboun } static void flush0(ChannelOutboundHandlerContext ctx, ChannelFuture future) { - if (ctx.prevOut().isBypass()) { + if (ctx.outbound().isBypass()) { ctx.flush(future); return; } - if (ctx.prevOut().hasMessageBuffer()) { - Queue out = ctx.prevOut().messageBuffer(); - Queue nextOut = ctx.out().messageBuffer(); + if (ctx.outbound().hasMessageBuffer()) { + Queue out = ctx.outbound().messageBuffer(); + Queue nextOut = ctx.nextOutboundMessageBuffer(); for (;;) { O msg = out.poll(); if (msg == null) { @@ -73,8 +73,8 @@ public abstract class ChannelOutboundHandlerAdapter implements ChannelOutboun nextOut.add(msg); } } else { - ChannelBuffer out = ctx.prevOut().byteBuffer(); - ChannelBuffer nextOut = ctx.out().byteBuffer(); + ChannelBuffer out = ctx.outbound().byteBuffer(); + ChannelBuffer nextOut = ctx.nextOutboundByteBuffer(); nextOut.writeBytes(out); out.discardReadBytes(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerContext.java index 5b0471fbf8..25324402bd 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerContext.java @@ -2,5 +2,5 @@ package io.netty.channel; public interface ChannelOutboundHandlerContext extends ChannelHandlerContext { - ChannelBufferHolder prevOut(); + ChannelBufferHolder outbound(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java index 9e5eb33fae..0bdbeeb1c8 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java @@ -3,8 +3,6 @@ package io.netty.channel; import java.net.SocketAddress; public interface ChannelOutboundInvoker { - ChannelBufferHolder out(); - ChannelFuture bind(SocketAddress localAddress); ChannelFuture connect(SocketAddress remoteAddress); ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress); diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 8d2da94d08..29f0ba1a3e 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -205,6 +205,9 @@ import java.util.NoSuchElementException; */ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker { + ChannelBufferHolder inbound(); + ChannelBufferHolder outbound(); + /** * Inserts a {@link ChannelHandler} at the first position of this pipeline. * diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 64ff3215d0..bbb02f2494 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -27,6 +27,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Queue; /** * The default {@link ChannelPipeline} implementation. It is usually created @@ -570,19 +571,19 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelBufferHolder nextIn() { + public ChannelBufferHolder inbound() { DefaultChannelHandlerContext ctx = firstInboundContext(); if (ctx != null) { - return ctx.in(); + return ctx.inbound(); } return null; } @Override - public ChannelBufferHolder out() { + public ChannelBufferHolder outbound() { DefaultChannelHandlerContext ctx = firstOutboundContext(); if (ctx != null) { - return ctx.prevOut(); + return ctx.outbound(); } return channel().unsafe().out(); } @@ -903,11 +904,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { } validateFuture(future); - if (out().hasMessageBuffer()) { - out().messageBuffer().add(message); + ChannelBufferHolder out = outbound(); + if (out.hasMessageBuffer()) { + out.messageBuffer().add(message); } else if (message instanceof ChannelBuffer) { ChannelBuffer m = (ChannelBuffer) message; - out().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); + out.byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); } else { throw new IllegalArgumentException( "cannot write a message whose type is not " + @@ -1057,7 +1059,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { private final boolean canHandleInbound; private final boolean canHandleOutbound; private final ChannelBufferHolder in; - private final ChannelBufferHolder prevOut; + private final ChannelBufferHolder out; @SuppressWarnings("unchecked") DefaultChannelHandlerContext( @@ -1096,7 +1098,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } if (canHandleOutbound) { try { - prevOut = ((ChannelOutboundHandler) handler).newOutboundBuffer(this); + out = ((ChannelOutboundHandler) handler).newOutboundBuffer(this); } catch (Exception e) { throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e); } finally { @@ -1105,7 +1107,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } } else { - prevOut = null; + out = null; } } @@ -1140,32 +1142,82 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelBufferHolder in() { + public ChannelBufferHolder inbound() { return in; } @Override - public ChannelBufferHolder prevOut() { - return prevOut; + public ChannelBufferHolder outbound() { + return out; } @Override - public ChannelBufferHolder nextIn() { - DefaultChannelHandlerContext next = nextInboundContext(this.next); - if (next != null) { - return next.in(); - } else { - throw new NoSuchElementException("no inbound buffer in the rest of the pipeline"); + public ChannelBuffer nextInboundByteBuffer() { + DefaultChannelHandlerContext ctx = this; + for (;;) { + ctx = nextInboundContext(ctx.next); + if (ctx == null) { + return null; + } + ChannelBufferHolder nextIn = ctx.inbound(); + if (nextIn.hasByteBuffer()) { + return nextIn.byteBuffer(); + } } } @Override - public ChannelBufferHolder out() { - DefaultChannelHandlerContext next = nextOutboundContext(prev); - if (next != null) { - return next.prevOut(); - } else { - return channel().unsafe().out(); + public Queue nextInboundMessageBuffer() { + DefaultChannelHandlerContext ctx = this; + for (;;) { + ctx = nextInboundContext(ctx.next); + if (ctx == null) { + return null; + } + ChannelBufferHolder nextIn = ctx.inbound(); + if (nextIn.hasMessageBuffer()) { + return nextIn.messageBuffer(); + } + } + } + + @Override + public ChannelBuffer nextOutboundByteBuffer() { + DefaultChannelHandlerContext ctx = this; + for (;;) { + ctx = nextOutboundContext(ctx.prev); + if (ctx == null) { + ChannelBufferHolder lastOut = channel().unsafe().out(); + if (lastOut.hasByteBuffer()) { + return lastOut.byteBuffer(); + } else { + return null; + } + } + ChannelBufferHolder nextOut = ctx.outbound(); + if (nextOut.hasByteBuffer()) { + return nextOut.byteBuffer(); + } + } + } + + @Override + public Queue nextOutboundMessageBuffer() { + DefaultChannelHandlerContext ctx = this; + for (;;) { + ctx = nextOutboundContext(ctx.prev); + if (ctx == null) { + ChannelBufferHolder lastOut = channel().unsafe().out(); + if (lastOut.hasMessageBuffer()) { + return lastOut.messageBuffer(); + } else { + return null; + } + } + ChannelBufferHolder nextOut = ctx.outbound(); + if (nextOut.hasMessageBuffer()) { + return nextOut.messageBuffer(); + } } } @@ -1304,9 +1356,9 @@ public class DefaultChannelPipeline implements ChannelPipeline { public ChannelFuture write(Object message, ChannelFuture future) { if (message instanceof ChannelBuffer) { ChannelBuffer m = (ChannelBuffer) message; - out().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); + nextOutboundByteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); } else { - out().messageBuffer().add(message); + nextOutboundMessageBuffer().add(message); } return flush(future); } diff --git a/transport/src/main/java/io/netty/channel/ServerChannelBootstrap.java b/transport/src/main/java/io/netty/channel/ServerChannelBootstrap.java index cf6090ea84..f0aa11a5b2 100644 --- a/transport/src/main/java/io/netty/channel/ServerChannelBootstrap.java +++ b/transport/src/main/java/io/netty/channel/ServerChannelBootstrap.java @@ -180,7 +180,7 @@ public class ServerChannelBootstrap { @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) { - Queue in = ctx.in().messageBuffer(); + Queue in = ctx.inbound().messageBuffer(); for (;;) { Channel child = in.poll(); if (child == null) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java index 59c6f70482..6bbd663a85 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java @@ -34,7 +34,7 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); - final ChannelBufferHolder buf = pipeline.nextIn(); + final ChannelBufferHolder buf = pipeline.inbound(); boolean closed = false; boolean read = false; try { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java index e30fc59b77..834bb49c1c 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java @@ -36,7 +36,7 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); - final ChannelBufferHolder buf = pipeline.nextIn(); + final ChannelBufferHolder buf = pipeline.inbound(); boolean closed = false; boolean read = false; try { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java index af8acb606d..78bfcb3614 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java @@ -32,7 +32,7 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); - final ChannelBufferHolder buf = pipeline.nextIn(); + final ChannelBufferHolder buf = pipeline.inbound(); boolean closed = false; boolean read = false; try { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java index f64b99334c..f06e03e9f0 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java @@ -33,7 +33,7 @@ abstract class AbstractOioStreamChannel extends AbstractOioChannel { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); - final ChannelBufferHolder buf = pipeline.nextIn(); + final ChannelBufferHolder buf = pipeline.inbound(); boolean closed = false; boolean read = false; try {