diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java index 3b5eff09ba..35ef155f40 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java @@ -78,7 +78,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter { @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { - Queue in = ctx.in().messageBuffer(); + Queue in = ctx.inbound().messageBuffer(); for (;;) { Object msg = in.poll(); if (msg == null) { @@ -272,7 +272,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter { } } - ctx.nextIn().messageBuffer().add(msg); + ctx.nextInboundMessageBuffer().add(msg); } @Override @@ -303,7 +303,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter { public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - Queue in = ctx.prevOut().messageBuffer(); + Queue in = ctx.outbound().messageBuffer(); for (;;) { Object msg = in.poll(); if (msg == null) { @@ -394,7 +394,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter { } } - ctx.out().messageBuffer().add(msg); + ctx.nextOutboundMessageBuffer().add(msg); } /* diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java index ba1b38df92..464bbf130e 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java @@ -248,7 +248,7 @@ public abstract class AbstractSocketSpdyEchoTest { @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { - ChannelBuffer m = ctx.in().byteBuffer().readBytes(ctx.in().byteBuffer().readableBytes()); + ChannelBuffer m = ctx.inbound().byteBuffer().readBytes(ctx.inbound().byteBuffer().readableBytes()); byte[] actual = new byte[m.readableBytes()]; m.getBytes(0, actual); diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index 348ff6ad0e..efde1a8a71 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -19,7 +19,7 @@ public abstract class MessageToMessageDecoder extends ChannelInboundHandle @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { - Queue in = ctx.in().messageBuffer(); + Queue in = ctx.inbound().messageBuffer(); boolean decoded = false; for (;;) { try { @@ -35,7 +35,7 @@ public abstract class MessageToMessageDecoder extends ChannelInboundHandle continue; } - if (unfoldAndAdd(ctx, ctx.nextIn(), emsg)) { + if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), emsg)) { decoded = true; } } catch (Throwable t) { diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index 0a53086591..e8da448598 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -1,6 +1,5 @@ package io.netty.handler.codec; -import io.netty.buffer.ChannelBuffer; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelFuture; @@ -20,7 +19,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundHandl @Override public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - Queue in = ctx.prevOut().messageBuffer(); + Queue in = ctx.outbound().messageBuffer(); boolean encoded = false; for (;;) { try { @@ -36,7 +35,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundHandl continue; } - if (unfoldAndAdd(ctx, ctx.out(), emsg)) { + if (unfoldAndAdd(ctx, ctx.nextOutboundMessageBuffer(), emsg)) { encoded = true; } } catch (Throwable t) { @@ -56,7 +55,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundHandl public abstract O encode(ChannelOutboundHandlerContext ctx, I msg) throws Exception; static boolean unfoldAndAdd( - ChannelHandlerContext ctx, ChannelBufferHolder dst, Object msg) throws Exception { + ChannelHandlerContext ctx, Queue dst, Object msg) throws Exception { if (msg == null) { return false; } @@ -94,18 +93,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundHandl return added; } - if (dst.hasMessageBuffer()) { - dst.messageBuffer().add(msg); - } else if (msg instanceof ChannelBuffer) { - ChannelBuffer buf = (ChannelBuffer) msg; - if (!buf.readable()) { - return false; - } - dst.byteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes()); - } else { - throw new UnsupportedMessageTypeException(msg, ChannelBuffer.class); - } - + dst.add(msg); return true; } } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToStreamEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToStreamEncoder.java index 31a19cb538..db2a88d607 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToStreamEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToStreamEncoder.java @@ -19,8 +19,8 @@ public abstract class MessageToStreamEncoder extends ChannelOutboundHandlerAd @Override public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - Queue in = ctx.prevOut().messageBuffer(); - ChannelBuffer out = ctx.out().byteBuffer(); + Queue in = ctx.outbound().messageBuffer(); + ChannelBuffer out = ctx.nextOutboundByteBuffer(); int oldOutSize = out.readableBytes(); for (;;) { diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index 5394c5cd7d..b7607776e3 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -378,7 +378,7 @@ public abstract class ReplayingDecoder> extends StreamToMes } try { - if (unfoldAndAdd(ctx, ctx.nextIn(), decodeLast(ctx, replayable))) { + if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), decodeLast(ctx, replayable))) { fireInboundBufferUpdated(ctx, in); } } catch (Signal replay) { @@ -442,7 +442,7 @@ public abstract class ReplayingDecoder> extends StreamToMes } // A successful decode - if (unfoldAndAdd(ctx, ctx.nextIn(), result)) { + if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), result)) { decoded = true; } } catch (Throwable t) { diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java index 04248ce398..74da6c515b 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java @@ -27,13 +27,13 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda @Override public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception { - ChannelBuffer in = ctx.in().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); if (in.readable()) { callDecode(ctx); } try { - if (unfoldAndAdd(ctx, ctx.nextIn(), decodeLast(ctx, in))) { + if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), decodeLast(ctx, in))) { in.discardReadBytes(); ctx.fireInboundBufferUpdated(); } @@ -49,7 +49,7 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda } protected void callDecode(ChannelInboundHandlerContext ctx) { - ChannelBuffer in = ctx.in().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); boolean decoded = false; for (;;) { @@ -69,7 +69,7 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda } } - if (unfoldAndAdd(ctx, ctx.nextIn(), o)) { + if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), o)) { decoded = true; } else { break; @@ -109,10 +109,10 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda // the new handler. ctx.pipeline().addAfter(ctx.name(), newHandlerName, newHandler); - ChannelBuffer in = ctx.in().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); try { if (in.readable()) { - ctx.nextIn().byteBuffer().writeBytes(ctx.in().byteBuffer()); + ctx.nextInboundByteBuffer().writeBytes(ctx.inbound().byteBuffer()); ctx.fireInboundBufferUpdated(); } } finally { diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToStreamDecoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToStreamDecoder.java index 219b9fea5b..c28d845943 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToStreamDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToStreamDecoder.java @@ -21,12 +21,12 @@ public abstract class StreamToStreamDecoder extends ChannelInboundHandlerAdapter @Override public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception { - ChannelBuffer in = ctx.in().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); if (!in.readable()) { callDecode(ctx); } - ChannelBuffer out = ctx.nextIn().byteBuffer(); + ChannelBuffer out = ctx.nextInboundByteBuffer(); int oldOutSize = out.readableBytes(); try { decodeLast(ctx, in, out); @@ -47,8 +47,8 @@ public abstract class StreamToStreamDecoder extends ChannelInboundHandlerAdapter } private void callDecode(ChannelInboundHandlerContext ctx) { - ChannelBuffer in = ctx.in().byteBuffer(); - ChannelBuffer out = ctx.nextIn().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); + ChannelBuffer out = ctx.nextInboundByteBuffer(); int oldOutSize = out.readableBytes(); while (in.readable()) { diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToStreamEncoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToStreamEncoder.java index 38e4987613..8894a22010 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToStreamEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToStreamEncoder.java @@ -17,8 +17,8 @@ public abstract class StreamToStreamEncoder extends ChannelOutboundHandlerAdapte @Override public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - ChannelBuffer in = ctx.prevOut().byteBuffer(); - ChannelBuffer out = ctx.out().byteBuffer(); + ChannelBuffer in = ctx.outbound().byteBuffer(); + ChannelBuffer out = ctx.nextOutboundByteBuffer(); int oldOutSize = out.readableBytes(); while (in.readable()) { diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java index ec4892bfbb..b38f905b88 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java @@ -57,7 +57,7 @@ public class DecoderEmbedder extends AbstractCodecEmbedder { @Override public boolean offer(Object input) { - ChannelBufferHolder in = pipeline().nextIn(); + ChannelBufferHolder in = pipeline().inbound(); if (in.hasByteBuffer()) { in.byteBuffer().writeBytes((ChannelBuffer) input); } else { diff --git a/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java b/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java index bdcfe1ccd6..fb4f836135 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java +++ b/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java @@ -34,7 +34,6 @@ public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter { private final byte[] content; private ChannelInboundHandlerContext ctx; - private ChannelBuffer out; public DiscardClientHandler(int messageSize) { if (messageSize <= 0) { @@ -49,7 +48,6 @@ public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter { public void channelActive(ChannelInboundHandlerContext ctx) throws Exception { this.ctx = ctx; - out = ctx.out().byteBuffer(); // Send the initial messages. generateTraffic(); } @@ -77,6 +75,7 @@ public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter { private void generateTraffic() { // Fill the outbound buffer up to 64KiB + ChannelBuffer out = ctx.nextOutboundByteBuffer(); while (out.readableBytes() < 65536) { out.writeBytes(content); } @@ -90,7 +89,7 @@ public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { - out.clear(); + ctx.nextOutboundByteBuffer().discardReadBytes(); generateTraffic(); } } diff --git a/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java b/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java index 8c3167013f..d617fe16c9 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java +++ b/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java @@ -34,7 +34,7 @@ public class DiscardServerHandler extends ChannelInboundStreamHandlerAdapter { public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { // Discard the received data silently. - ctx.in().byteBuffer().clear(); + ctx.inbound().byteBuffer().clear(); } diff --git a/example/src/main/java/io/netty/example/echo/EchoClientHandler.java b/example/src/main/java/io/netty/example/echo/EchoClientHandler.java index 56a48d3c29..e84ffc9c56 100644 --- a/example/src/main/java/io/netty/example/echo/EchoClientHandler.java +++ b/example/src/main/java/io/netty/example/echo/EchoClientHandler.java @@ -62,8 +62,8 @@ public class EchoClientHandler extends ChannelInboundHandlerAdapter { @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) { - ChannelBuffer in = ctx.in().byteBuffer(); - ChannelBuffer out = ctx.out().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); + ChannelBuffer out = ctx.nextOutboundByteBuffer(); out.discardReadBytes(); out.writeBytes(in); in.discardReadBytes(); diff --git a/example/src/main/java/io/netty/example/echo/EchoServerHandler.java b/example/src/main/java/io/netty/example/echo/EchoServerHandler.java index 0b388b182b..f24c10408f 100644 --- a/example/src/main/java/io/netty/example/echo/EchoServerHandler.java +++ b/example/src/main/java/io/netty/example/echo/EchoServerHandler.java @@ -39,8 +39,8 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) { - ChannelBuffer in = ctx.in().byteBuffer(); - ChannelBuffer out = ctx.out().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); + ChannelBuffer out = ctx.nextOutboundByteBuffer(); out.discardReadBytes(); out.writeBytes(in); in.discardReadBytes(); diff --git a/example/src/main/java/io/netty/example/factorial/FactorialClientHandler.java b/example/src/main/java/io/netty/example/factorial/FactorialClientHandler.java index 8ec3ddd633..6af06deb23 100644 --- a/example/src/main/java/io/netty/example/factorial/FactorialClientHandler.java +++ b/example/src/main/java/io/netty/example/factorial/FactorialClientHandler.java @@ -40,7 +40,6 @@ public class FactorialClientHandler extends ChannelInboundMessageHandlerAdapter< FactorialClientHandler.class.getName()); private ChannelInboundHandlerContext ctx; - private Queue out; private int i = 1; private int receivedMessages; private final int count; @@ -68,7 +67,6 @@ public class FactorialClientHandler extends ChannelInboundMessageHandlerAdapter< @Override public void channelActive(ChannelInboundHandlerContext ctx) { this.ctx = ctx; - out = ctx.out().messageBuffer(); sendNumbers(); } @@ -101,6 +99,7 @@ public class FactorialClientHandler extends ChannelInboundMessageHandlerAdapter< private void sendNumbers() { // Do not send more than 4096 numbers. boolean finished = false; + Queue out = ctx.nextOutboundMessageBuffer(); while (out.size() < 4096) { if (i <= count) { out.add(Integer.valueOf(i)); diff --git a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java index a25db516c8..18eb54f82e 100644 --- a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java +++ b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java @@ -352,7 +352,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { if (getLogger().isEnabled(internalLevel)) { - logger.log(internalLevel, format(ctx, formatBuffer("INBUF", ctx.in()))); + logger.log(internalLevel, format(ctx, formatBuffer("INBUF", ctx.inbound()))); } ctx.fireInboundBufferUpdated(); } @@ -407,7 +407,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { if (getLogger().isEnabled(internalLevel)) { - logger.log(internalLevel, format(ctx, formatBuffer("OUTBUF", ctx.prevOut()))); + logger.log(internalLevel, format(ctx, formatBuffer("OUTBUF", ctx.outbound()))); } ctx.flush(future); } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 13d6ec0f4a..03d11ee922 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -259,8 +259,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public ChannelBufferHolder out() { - return pipeline().out(); + public ChannelBufferHolder outbound() { + return pipeline().outbound(); } @Override diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index d868b3ec5e..cb30b6a859 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -37,7 +37,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S } @Override - public ChannelBufferHolder out() { + public ChannelBufferHolder outbound() { return ChannelBufferHolders.discardBuffer(); } diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 9d36ae35a0..a1ecc993a0 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -136,6 +136,8 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu boolean isRegistered(); boolean isActive(); + ChannelBufferHolder outbound(); + /** * Returns the local address where this channel is bound to. The returned * {@link SocketAddress} is supposed to be down-cast into more concrete diff --git a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java index 9cd17f6671..018094c35e 100644 --- a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java +++ b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java @@ -59,9 +59,9 @@ public final class ChannelBufferHolder { case 0: return msgBuf != null; case 1: - return ctx.nextIn().hasMessageBuffer(); + return ctx.nextInboundMessageBuffer() != null; case 2: - return ctx.out().hasMessageBuffer(); + return ctx.nextOutboundMessageBuffer() != null; default: throw new Error(); } @@ -72,15 +72,14 @@ public final class ChannelBufferHolder { case 0: return byteBuf != null; case 1: - return ctx.nextIn().hasByteBuffer(); + return ctx.nextInboundByteBuffer() != null; case 2: - return ctx.out().hasByteBuffer(); + return ctx.nextOutboundByteBuffer() != null; default: throw new Error(); } } - @SuppressWarnings("unchecked") public Queue messageBuffer() { switch (bypassDirection) { case 0: @@ -89,9 +88,9 @@ public final class ChannelBufferHolder { } return msgBuf; case 1: - return (Queue) ctx.nextIn().messageBuffer(); + return (Queue) ctx.nextInboundMessageBuffer(); case 2: - return (Queue) ctx.out().messageBuffer(); + return (Queue) ctx.nextOutboundMessageBuffer(); default: throw new Error(); } @@ -105,9 +104,9 @@ public final class ChannelBufferHolder { } return byteBuf; case 1: - return ctx.nextIn().byteBuffer(); + return ctx.nextInboundByteBuffer(); case 2: - return ctx.out().byteBuffer(); + return ctx.nextOutboundByteBuffer(); default: throw new Error(); } @@ -118,14 +117,18 @@ public final class ChannelBufferHolder { switch (bypassDirection) { case 0: if (msgBuf != null) { - return msgBuf.toString(); + if (byteBuf != null) { + return "CatchAllBuffer"; + } else { + return "MessageBuffer(" + msgBuf.size() + ')'; + } } else { return byteBuf.toString(); } case 1: - return ctx.nextIn().toString(); + return "InboundBypassBuffer"; case 2: - return ctx.out().toString(); + return "OutboundBypassBuffer"; default: throw new Error(); } @@ -140,9 +143,8 @@ public final class ChannelBufferHolder { return byteBuf.readableBytes(); } case 1: - return ctx.nextIn().size(); case 2: - return ctx.out().size(); + throw new UnsupportedOperationException(); default: throw new Error(); } @@ -157,9 +159,8 @@ public final class ChannelBufferHolder { return byteBuf.readable(); } case 1: - return ctx.nextIn().isEmpty(); case 2: - return ctx.out().isEmpty(); + throw new UnsupportedOperationException(); default: throw new Error(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index 581d4ac832..16d7de2b1a 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -16,9 +16,11 @@ package io.netty.channel; +import io.netty.buffer.ChannelBuffer; import io.netty.util.AttributeMap; import java.nio.channels.Channels; +import java.util.Queue; /** * Enables a {@link ChannelHandler} to interact with its {@link ChannelPipeline} @@ -133,4 +135,10 @@ public interface ChannelHandlerContext boolean canHandleInbound(); boolean canHandleOutbound(); + + ChannelBuffer nextInboundByteBuffer(); + Queue nextInboundMessageBuffer(); + + ChannelBuffer nextOutboundByteBuffer(); + Queue nextOutboundMessageBuffer(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java index a95eb9e8ca..c0387e9f3d 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java @@ -61,14 +61,14 @@ public abstract class ChannelInboundHandlerAdapter implements ChannelInboundH } static void inboundBufferUpdated0(ChannelInboundHandlerContext ctx) { - if (ctx.in().isBypass()) { + if (ctx.inbound().isBypass()) { ctx.fireInboundBufferUpdated(); return; } - if (ctx.in().hasMessageBuffer()) { - Queue in = ctx.in().messageBuffer(); - Queue nextIn = ctx.nextIn().messageBuffer(); + if (ctx.inbound().hasMessageBuffer()) { + Queue in = ctx.inbound().messageBuffer(); + Queue nextIn = ctx.nextInboundMessageBuffer(); for (;;) { I msg = in.poll(); if (msg == null) { @@ -77,8 +77,8 @@ public abstract class ChannelInboundHandlerAdapter implements ChannelInboundH nextIn.add(msg); } } else { - ChannelBuffer in = ctx.in().byteBuffer(); - ChannelBuffer nextIn = ctx.nextIn().byteBuffer(); + ChannelBuffer in = ctx.inbound().byteBuffer(); + ChannelBuffer nextIn = ctx.nextInboundByteBuffer(); nextIn.writeBytes(in); in.discardReadBytes(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerContext.java index 8657a23461..7537a3529b 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerContext.java @@ -1,5 +1,5 @@ package io.netty.channel; public interface ChannelInboundHandlerContext extends ChannelHandlerContext { - ChannelBufferHolder in(); + ChannelBufferHolder inbound(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java index 993b80ad91..82e1fbe701 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java @@ -2,8 +2,6 @@ package io.netty.channel; public interface ChannelInboundInvoker { - ChannelBufferHolder nextIn(); - void fireChannelRegistered(); void fireChannelUnregistered(); void fireChannelActive(); diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java index 4b56a0f919..c103677c2d 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java @@ -14,7 +14,7 @@ public class ChannelInboundMessageHandlerAdapter extends @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { - Queue in = ctx.in().messageBuffer(); + Queue in = ctx.inbound().messageBuffer(); for (;;) { I msg = in.poll(); if (msg == null) { @@ -30,6 +30,6 @@ public class ChannelInboundMessageHandlerAdapter extends } public void messageReceived(ChannelInboundHandlerContext ctx, I msg) throws Exception { - ctx.nextIn().messageBuffer().add(msg); + ctx.nextInboundMessageBuffer().add(msg); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java index 794c6e45ab..784721364b 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java @@ -57,14 +57,14 @@ public abstract class ChannelOutboundHandlerAdapter implements ChannelOutboun } static void flush0(ChannelOutboundHandlerContext ctx, ChannelFuture future) { - if (ctx.prevOut().isBypass()) { + if (ctx.outbound().isBypass()) { ctx.flush(future); return; } - if (ctx.prevOut().hasMessageBuffer()) { - Queue out = ctx.prevOut().messageBuffer(); - Queue nextOut = ctx.out().messageBuffer(); + if (ctx.outbound().hasMessageBuffer()) { + Queue out = ctx.outbound().messageBuffer(); + Queue nextOut = ctx.nextOutboundMessageBuffer(); for (;;) { O msg = out.poll(); if (msg == null) { @@ -73,8 +73,8 @@ public abstract class ChannelOutboundHandlerAdapter implements ChannelOutboun nextOut.add(msg); } } else { - ChannelBuffer out = ctx.prevOut().byteBuffer(); - ChannelBuffer nextOut = ctx.out().byteBuffer(); + ChannelBuffer out = ctx.outbound().byteBuffer(); + ChannelBuffer nextOut = ctx.nextOutboundByteBuffer(); nextOut.writeBytes(out); out.discardReadBytes(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerContext.java index 5b0471fbf8..25324402bd 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerContext.java @@ -2,5 +2,5 @@ package io.netty.channel; public interface ChannelOutboundHandlerContext extends ChannelHandlerContext { - ChannelBufferHolder prevOut(); + ChannelBufferHolder outbound(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java index 9e5eb33fae..0bdbeeb1c8 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java @@ -3,8 +3,6 @@ package io.netty.channel; import java.net.SocketAddress; public interface ChannelOutboundInvoker { - ChannelBufferHolder out(); - ChannelFuture bind(SocketAddress localAddress); ChannelFuture connect(SocketAddress remoteAddress); ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress); diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 8d2da94d08..29f0ba1a3e 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -205,6 +205,9 @@ import java.util.NoSuchElementException; */ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker { + ChannelBufferHolder inbound(); + ChannelBufferHolder outbound(); + /** * Inserts a {@link ChannelHandler} at the first position of this pipeline. * diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 64ff3215d0..bbb02f2494 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -27,6 +27,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Queue; /** * The default {@link ChannelPipeline} implementation. It is usually created @@ -570,19 +571,19 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelBufferHolder nextIn() { + public ChannelBufferHolder inbound() { DefaultChannelHandlerContext ctx = firstInboundContext(); if (ctx != null) { - return ctx.in(); + return ctx.inbound(); } return null; } @Override - public ChannelBufferHolder out() { + public ChannelBufferHolder outbound() { DefaultChannelHandlerContext ctx = firstOutboundContext(); if (ctx != null) { - return ctx.prevOut(); + return ctx.outbound(); } return channel().unsafe().out(); } @@ -903,11 +904,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { } validateFuture(future); - if (out().hasMessageBuffer()) { - out().messageBuffer().add(message); + ChannelBufferHolder out = outbound(); + if (out.hasMessageBuffer()) { + out.messageBuffer().add(message); } else if (message instanceof ChannelBuffer) { ChannelBuffer m = (ChannelBuffer) message; - out().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); + out.byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); } else { throw new IllegalArgumentException( "cannot write a message whose type is not " + @@ -1057,7 +1059,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { private final boolean canHandleInbound; private final boolean canHandleOutbound; private final ChannelBufferHolder in; - private final ChannelBufferHolder prevOut; + private final ChannelBufferHolder out; @SuppressWarnings("unchecked") DefaultChannelHandlerContext( @@ -1096,7 +1098,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } if (canHandleOutbound) { try { - prevOut = ((ChannelOutboundHandler) handler).newOutboundBuffer(this); + out = ((ChannelOutboundHandler) handler).newOutboundBuffer(this); } catch (Exception e) { throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e); } finally { @@ -1105,7 +1107,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } } else { - prevOut = null; + out = null; } } @@ -1140,32 +1142,82 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelBufferHolder in() { + public ChannelBufferHolder inbound() { return in; } @Override - public ChannelBufferHolder prevOut() { - return prevOut; + public ChannelBufferHolder outbound() { + return out; } @Override - public ChannelBufferHolder nextIn() { - DefaultChannelHandlerContext next = nextInboundContext(this.next); - if (next != null) { - return next.in(); - } else { - throw new NoSuchElementException("no inbound buffer in the rest of the pipeline"); + public ChannelBuffer nextInboundByteBuffer() { + DefaultChannelHandlerContext ctx = this; + for (;;) { + ctx = nextInboundContext(ctx.next); + if (ctx == null) { + return null; + } + ChannelBufferHolder nextIn = ctx.inbound(); + if (nextIn.hasByteBuffer()) { + return nextIn.byteBuffer(); + } } } @Override - public ChannelBufferHolder out() { - DefaultChannelHandlerContext next = nextOutboundContext(prev); - if (next != null) { - return next.prevOut(); - } else { - return channel().unsafe().out(); + public Queue nextInboundMessageBuffer() { + DefaultChannelHandlerContext ctx = this; + for (;;) { + ctx = nextInboundContext(ctx.next); + if (ctx == null) { + return null; + } + ChannelBufferHolder nextIn = ctx.inbound(); + if (nextIn.hasMessageBuffer()) { + return nextIn.messageBuffer(); + } + } + } + + @Override + public ChannelBuffer nextOutboundByteBuffer() { + DefaultChannelHandlerContext ctx = this; + for (;;) { + ctx = nextOutboundContext(ctx.prev); + if (ctx == null) { + ChannelBufferHolder lastOut = channel().unsafe().out(); + if (lastOut.hasByteBuffer()) { + return lastOut.byteBuffer(); + } else { + return null; + } + } + ChannelBufferHolder nextOut = ctx.outbound(); + if (nextOut.hasByteBuffer()) { + return nextOut.byteBuffer(); + } + } + } + + @Override + public Queue nextOutboundMessageBuffer() { + DefaultChannelHandlerContext ctx = this; + for (;;) { + ctx = nextOutboundContext(ctx.prev); + if (ctx == null) { + ChannelBufferHolder lastOut = channel().unsafe().out(); + if (lastOut.hasMessageBuffer()) { + return lastOut.messageBuffer(); + } else { + return null; + } + } + ChannelBufferHolder nextOut = ctx.outbound(); + if (nextOut.hasMessageBuffer()) { + return nextOut.messageBuffer(); + } } } @@ -1304,9 +1356,9 @@ public class DefaultChannelPipeline implements ChannelPipeline { public ChannelFuture write(Object message, ChannelFuture future) { if (message instanceof ChannelBuffer) { ChannelBuffer m = (ChannelBuffer) message; - out().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); + nextOutboundByteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); } else { - out().messageBuffer().add(message); + nextOutboundMessageBuffer().add(message); } return flush(future); } diff --git a/transport/src/main/java/io/netty/channel/ServerChannelBootstrap.java b/transport/src/main/java/io/netty/channel/ServerChannelBootstrap.java index cf6090ea84..f0aa11a5b2 100644 --- a/transport/src/main/java/io/netty/channel/ServerChannelBootstrap.java +++ b/transport/src/main/java/io/netty/channel/ServerChannelBootstrap.java @@ -180,7 +180,7 @@ public class ServerChannelBootstrap { @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) { - Queue in = ctx.in().messageBuffer(); + Queue in = ctx.inbound().messageBuffer(); for (;;) { Channel child = in.poll(); if (child == null) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java index 59c6f70482..6bbd663a85 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java @@ -34,7 +34,7 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); - final ChannelBufferHolder buf = pipeline.nextIn(); + final ChannelBufferHolder buf = pipeline.inbound(); boolean closed = false; boolean read = false; try { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java index e30fc59b77..834bb49c1c 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java @@ -36,7 +36,7 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); - final ChannelBufferHolder buf = pipeline.nextIn(); + final ChannelBufferHolder buf = pipeline.inbound(); boolean closed = false; boolean read = false; try { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java index af8acb606d..78bfcb3614 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java @@ -32,7 +32,7 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); - final ChannelBufferHolder buf = pipeline.nextIn(); + final ChannelBufferHolder buf = pipeline.inbound(); boolean closed = false; boolean read = false; try { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java index f64b99334c..f06e03e9f0 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java @@ -33,7 +33,7 @@ abstract class AbstractOioStreamChannel extends AbstractOioChannel { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); - final ChannelBufferHolder buf = pipeline.nextIn(); + final ChannelBufferHolder buf = pipeline.inbound(); boolean closed = false; boolean read = false; try {