Refactor the pipeline API to support stacked codecs

- Previous API did not support the pipeline which contains multiple
  MessageToStreamEncoders because there was no way to find the closest
  outbound byte buffer.  Now you always get the correct buffer even if
  the handler that provides the buffer is placed distantly.
  For example:
  
    Channel -> MsgAEncoder -> MsgBEncoder -> MsgCEncoder
  
  Msg(A|B|C)Encoder will all have access to the channel's outbound
  byte buffer.  Previously, it was simply impossible.

- Improved ChannelBufferHolder.toString()
This commit is contained in:
Trustin Lee 2012-05-29 12:09:29 -07:00
parent 81e8c49931
commit 026715e818
35 changed files with 171 additions and 123 deletions

View File

@ -78,7 +78,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter<Object, Object> {
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
Queue<Object> in = ctx.in().messageBuffer();
Queue<Object> in = ctx.inbound().messageBuffer();
for (;;) {
Object msg = in.poll();
if (msg == null) {
@ -272,7 +272,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter<Object, Object> {
}
}
ctx.nextIn().messageBuffer().add(msg);
ctx.nextInboundMessageBuffer().add(msg);
}
@Override
@ -303,7 +303,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter<Object, Object> {
public void flush(ChannelOutboundHandlerContext<Object> ctx,
ChannelFuture future) throws Exception {
Queue<Object> in = ctx.prevOut().messageBuffer();
Queue<Object> in = ctx.outbound().messageBuffer();
for (;;) {
Object msg = in.poll();
if (msg == null) {
@ -394,7 +394,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter<Object, Object> {
}
}
ctx.out().messageBuffer().add(msg);
ctx.nextOutboundMessageBuffer().add(msg);
}
/*

View File

@ -248,7 +248,7 @@ public abstract class AbstractSocketSpdyEchoTest {
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx)
throws Exception {
ChannelBuffer m = ctx.in().byteBuffer().readBytes(ctx.in().byteBuffer().readableBytes());
ChannelBuffer m = ctx.inbound().byteBuffer().readBytes(ctx.inbound().byteBuffer().readableBytes());
byte[] actual = new byte[m.readableBytes()];
m.getBytes(0, actual);

View File

@ -19,7 +19,7 @@ public abstract class MessageToMessageDecoder<I, O> extends ChannelInboundHandle
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<I> ctx)
throws Exception {
Queue<I> in = ctx.in().messageBuffer();
Queue<I> in = ctx.inbound().messageBuffer();
boolean decoded = false;
for (;;) {
try {
@ -35,7 +35,7 @@ public abstract class MessageToMessageDecoder<I, O> extends ChannelInboundHandle
continue;
}
if (unfoldAndAdd(ctx, ctx.nextIn(), emsg)) {
if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), emsg)) {
decoded = true;
}
} catch (Throwable t) {

View File

@ -1,6 +1,5 @@
package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
@ -20,7 +19,7 @@ public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundHandl
@Override
public void flush(ChannelOutboundHandlerContext<I> ctx, ChannelFuture future) throws Exception {
Queue<I> in = ctx.prevOut().messageBuffer();
Queue<I> in = ctx.outbound().messageBuffer();
boolean encoded = false;
for (;;) {
try {
@ -36,7 +35,7 @@ public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundHandl
continue;
}
if (unfoldAndAdd(ctx, ctx.out(), emsg)) {
if (unfoldAndAdd(ctx, ctx.nextOutboundMessageBuffer(), emsg)) {
encoded = true;
}
} catch (Throwable t) {
@ -56,7 +55,7 @@ public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundHandl
public abstract O encode(ChannelOutboundHandlerContext<I> ctx, I msg) throws Exception;
static <T> boolean unfoldAndAdd(
ChannelHandlerContext ctx, ChannelBufferHolder<Object> dst, Object msg) throws Exception {
ChannelHandlerContext ctx, Queue<Object> dst, Object msg) throws Exception {
if (msg == null) {
return false;
}
@ -94,18 +93,7 @@ public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundHandl
return added;
}
if (dst.hasMessageBuffer()) {
dst.messageBuffer().add(msg);
} else if (msg instanceof ChannelBuffer) {
ChannelBuffer buf = (ChannelBuffer) msg;
if (!buf.readable()) {
return false;
}
dst.byteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes());
} else {
throw new UnsupportedMessageTypeException(msg, ChannelBuffer.class);
}
dst.add(msg);
return true;
}
}

View File

@ -19,8 +19,8 @@ public abstract class MessageToStreamEncoder<I> extends ChannelOutboundHandlerAd
@Override
public void flush(ChannelOutboundHandlerContext<I> ctx, ChannelFuture future) throws Exception {
Queue<I> in = ctx.prevOut().messageBuffer();
ChannelBuffer out = ctx.out().byteBuffer();
Queue<I> in = ctx.outbound().messageBuffer();
ChannelBuffer out = ctx.nextOutboundByteBuffer();
int oldOutSize = out.readableBytes();
for (;;) {

View File

@ -378,7 +378,7 @@ public abstract class ReplayingDecoder<O, S extends Enum<S>> extends StreamToMes
}
try {
if (unfoldAndAdd(ctx, ctx.nextIn(), decodeLast(ctx, replayable))) {
if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), decodeLast(ctx, replayable))) {
fireInboundBufferUpdated(ctx, in);
}
} catch (Signal replay) {
@ -442,7 +442,7 @@ public abstract class ReplayingDecoder<O, S extends Enum<S>> extends StreamToMes
}
// A successful decode
if (unfoldAndAdd(ctx, ctx.nextIn(), result)) {
if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), result)) {
decoded = true;
}
} catch (Throwable t) {

View File

@ -27,13 +27,13 @@ public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAda
@Override
public void channelInactive(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer in = ctx.inbound().byteBuffer();
if (in.readable()) {
callDecode(ctx);
}
try {
if (unfoldAndAdd(ctx, ctx.nextIn(), decodeLast(ctx, in))) {
if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), decodeLast(ctx, in))) {
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
@ -49,7 +49,7 @@ public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAda
}
protected void callDecode(ChannelInboundHandlerContext<Byte> ctx) {
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer in = ctx.inbound().byteBuffer();
boolean decoded = false;
for (;;) {
@ -69,7 +69,7 @@ public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAda
}
}
if (unfoldAndAdd(ctx, ctx.nextIn(), o)) {
if (unfoldAndAdd(ctx, ctx.nextInboundMessageBuffer(), o)) {
decoded = true;
} else {
break;
@ -109,10 +109,10 @@ public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAda
// the new handler.
ctx.pipeline().addAfter(ctx.name(), newHandlerName, newHandler);
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer in = ctx.inbound().byteBuffer();
try {
if (in.readable()) {
ctx.nextIn().byteBuffer().writeBytes(ctx.in().byteBuffer());
ctx.nextInboundByteBuffer().writeBytes(ctx.inbound().byteBuffer());
ctx.fireInboundBufferUpdated();
}
} finally {

View File

@ -21,12 +21,12 @@ public abstract class StreamToStreamDecoder extends ChannelInboundHandlerAdapter
@Override
public void channelInactive(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer in = ctx.inbound().byteBuffer();
if (!in.readable()) {
callDecode(ctx);
}
ChannelBuffer out = ctx.nextIn().byteBuffer();
ChannelBuffer out = ctx.nextInboundByteBuffer();
int oldOutSize = out.readableBytes();
try {
decodeLast(ctx, in, out);
@ -47,8 +47,8 @@ public abstract class StreamToStreamDecoder extends ChannelInboundHandlerAdapter
}
private void callDecode(ChannelInboundHandlerContext<Byte> ctx) {
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer out = ctx.nextIn().byteBuffer();
ChannelBuffer in = ctx.inbound().byteBuffer();
ChannelBuffer out = ctx.nextInboundByteBuffer();
int oldOutSize = out.readableBytes();
while (in.readable()) {

View File

@ -17,8 +17,8 @@ public abstract class StreamToStreamEncoder extends ChannelOutboundHandlerAdapte
@Override
public void flush(ChannelOutboundHandlerContext<Byte> ctx, ChannelFuture future) throws Exception {
ChannelBuffer in = ctx.prevOut().byteBuffer();
ChannelBuffer out = ctx.out().byteBuffer();
ChannelBuffer in = ctx.outbound().byteBuffer();
ChannelBuffer out = ctx.nextOutboundByteBuffer();
int oldOutSize = out.readableBytes();
while (in.readable()) {

View File

@ -57,7 +57,7 @@ public class DecoderEmbedder<E> extends AbstractCodecEmbedder<E> {
@Override
public boolean offer(Object input) {
ChannelBufferHolder<Object> in = pipeline().nextIn();
ChannelBufferHolder<Object> in = pipeline().inbound();
if (in.hasByteBuffer()) {
in.byteBuffer().writeBytes((ChannelBuffer) input);
} else {

View File

@ -34,7 +34,6 @@ public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter {
private final byte[] content;
private ChannelInboundHandlerContext<Byte> ctx;
private ChannelBuffer out;
public DiscardClientHandler(int messageSize) {
if (messageSize <= 0) {
@ -49,7 +48,6 @@ public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter {
public void channelActive(ChannelInboundHandlerContext<Byte> ctx)
throws Exception {
this.ctx = ctx;
out = ctx.out().byteBuffer();
// Send the initial messages.
generateTraffic();
}
@ -77,6 +75,7 @@ public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter {
private void generateTraffic() {
// Fill the outbound buffer up to 64KiB
ChannelBuffer out = ctx.nextOutboundByteBuffer();
while (out.readableBytes() < 65536) {
out.writeBytes(content);
}
@ -90,7 +89,7 @@ public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
out.clear();
ctx.nextOutboundByteBuffer().discardReadBytes();
generateTraffic();
}
}

View File

@ -34,7 +34,7 @@ public class DiscardServerHandler extends ChannelInboundStreamHandlerAdapter {
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx)
throws Exception {
// Discard the received data silently.
ctx.in().byteBuffer().clear();
ctx.inbound().byteBuffer().clear();
}

View File

@ -62,8 +62,8 @@ public class EchoClientHandler extends ChannelInboundHandlerAdapter<Byte> {
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx) {
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer out = ctx.out().byteBuffer();
ChannelBuffer in = ctx.inbound().byteBuffer();
ChannelBuffer out = ctx.nextOutboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
in.discardReadBytes();

View File

@ -39,8 +39,8 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter<Byte> {
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx) {
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer out = ctx.out().byteBuffer();
ChannelBuffer in = ctx.inbound().byteBuffer();
ChannelBuffer out = ctx.nextOutboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
in.discardReadBytes();

View File

@ -40,7 +40,6 @@ public class FactorialClientHandler extends ChannelInboundMessageHandlerAdapter<
FactorialClientHandler.class.getName());
private ChannelInboundHandlerContext<BigInteger> ctx;
private Queue<Object> out;
private int i = 1;
private int receivedMessages;
private final int count;
@ -68,7 +67,6 @@ public class FactorialClientHandler extends ChannelInboundMessageHandlerAdapter<
@Override
public void channelActive(ChannelInboundHandlerContext<BigInteger> ctx) {
this.ctx = ctx;
out = ctx.out().messageBuffer();
sendNumbers();
}
@ -101,6 +99,7 @@ public class FactorialClientHandler extends ChannelInboundMessageHandlerAdapter<
private void sendNumbers() {
// Do not send more than 4096 numbers.
boolean finished = false;
Queue<Object> out = ctx.nextOutboundMessageBuffer();
while (out.size() < 4096) {
if (i <= count) {
out.add(Integer.valueOf(i));

View File

@ -352,7 +352,7 @@ public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
public void inboundBufferUpdated(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
if (getLogger().isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, formatBuffer("INBUF", ctx.in())));
logger.log(internalLevel, format(ctx, formatBuffer("INBUF", ctx.inbound())));
}
ctx.fireInboundBufferUpdated();
}
@ -407,7 +407,7 @@ public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
public void flush(ChannelOutboundHandlerContext<Object> ctx,
ChannelFuture future) throws Exception {
if (getLogger().isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, formatBuffer("OUTBUF", ctx.prevOut())));
logger.log(internalLevel, format(ctx, formatBuffer("OUTBUF", ctx.outbound())));
}
ctx.flush(future);
}

View File

@ -259,8 +259,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public ChannelBufferHolder<Object> out() {
return pipeline().out();
public ChannelBufferHolder<Object> outbound() {
return pipeline().outbound();
}
@Override

View File

@ -37,7 +37,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
}
@Override
public ChannelBufferHolder<Object> out() {
public ChannelBufferHolder<Object> outbound() {
return ChannelBufferHolders.discardBuffer();
}

View File

@ -136,6 +136,8 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
boolean isRegistered();
boolean isActive();
ChannelBufferHolder<Object> outbound();
/**
* Returns the local address where this channel is bound to. The returned
* {@link SocketAddress} is supposed to be down-cast into more concrete

View File

@ -59,9 +59,9 @@ public final class ChannelBufferHolder<E> {
case 0:
return msgBuf != null;
case 1:
return ctx.nextIn().hasMessageBuffer();
return ctx.nextInboundMessageBuffer() != null;
case 2:
return ctx.out().hasMessageBuffer();
return ctx.nextOutboundMessageBuffer() != null;
default:
throw new Error();
}
@ -72,15 +72,14 @@ public final class ChannelBufferHolder<E> {
case 0:
return byteBuf != null;
case 1:
return ctx.nextIn().hasByteBuffer();
return ctx.nextInboundByteBuffer() != null;
case 2:
return ctx.out().hasByteBuffer();
return ctx.nextOutboundByteBuffer() != null;
default:
throw new Error();
}
}
@SuppressWarnings("unchecked")
public Queue<E> messageBuffer() {
switch (bypassDirection) {
case 0:
@ -89,9 +88,9 @@ public final class ChannelBufferHolder<E> {
}
return msgBuf;
case 1:
return (Queue<E>) ctx.nextIn().messageBuffer();
return (Queue<E>) ctx.nextInboundMessageBuffer();
case 2:
return (Queue<E>) ctx.out().messageBuffer();
return (Queue<E>) ctx.nextOutboundMessageBuffer();
default:
throw new Error();
}
@ -105,9 +104,9 @@ public final class ChannelBufferHolder<E> {
}
return byteBuf;
case 1:
return ctx.nextIn().byteBuffer();
return ctx.nextInboundByteBuffer();
case 2:
return ctx.out().byteBuffer();
return ctx.nextOutboundByteBuffer();
default:
throw new Error();
}
@ -118,14 +117,18 @@ public final class ChannelBufferHolder<E> {
switch (bypassDirection) {
case 0:
if (msgBuf != null) {
return msgBuf.toString();
if (byteBuf != null) {
return "CatchAllBuffer";
} else {
return "MessageBuffer(" + msgBuf.size() + ')';
}
} else {
return byteBuf.toString();
}
case 1:
return ctx.nextIn().toString();
return "InboundBypassBuffer";
case 2:
return ctx.out().toString();
return "OutboundBypassBuffer";
default:
throw new Error();
}
@ -140,9 +143,8 @@ public final class ChannelBufferHolder<E> {
return byteBuf.readableBytes();
}
case 1:
return ctx.nextIn().size();
case 2:
return ctx.out().size();
throw new UnsupportedOperationException();
default:
throw new Error();
}
@ -157,9 +159,8 @@ public final class ChannelBufferHolder<E> {
return byteBuf.readable();
}
case 1:
return ctx.nextIn().isEmpty();
case 2:
return ctx.out().isEmpty();
throw new UnsupportedOperationException();
default:
throw new Error();
}

View File

@ -16,9 +16,11 @@
package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import io.netty.util.AttributeMap;
import java.nio.channels.Channels;
import java.util.Queue;
/**
* Enables a {@link ChannelHandler} to interact with its {@link ChannelPipeline}
@ -133,4 +135,10 @@ public interface ChannelHandlerContext
boolean canHandleInbound();
boolean canHandleOutbound();
ChannelBuffer nextInboundByteBuffer();
Queue<Object> nextInboundMessageBuffer();
ChannelBuffer nextOutboundByteBuffer();
Queue<Object> nextOutboundMessageBuffer();
}

View File

@ -61,14 +61,14 @@ public abstract class ChannelInboundHandlerAdapter<I> implements ChannelInboundH
}
static <I> void inboundBufferUpdated0(ChannelInboundHandlerContext<I> ctx) {
if (ctx.in().isBypass()) {
if (ctx.inbound().isBypass()) {
ctx.fireInboundBufferUpdated();
return;
}
if (ctx.in().hasMessageBuffer()) {
Queue<I> in = ctx.in().messageBuffer();
Queue<Object> nextIn = ctx.nextIn().messageBuffer();
if (ctx.inbound().hasMessageBuffer()) {
Queue<I> in = ctx.inbound().messageBuffer();
Queue<Object> nextIn = ctx.nextInboundMessageBuffer();
for (;;) {
I msg = in.poll();
if (msg == null) {
@ -77,8 +77,8 @@ public abstract class ChannelInboundHandlerAdapter<I> implements ChannelInboundH
nextIn.add(msg);
}
} else {
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer nextIn = ctx.nextIn().byteBuffer();
ChannelBuffer in = ctx.inbound().byteBuffer();
ChannelBuffer nextIn = ctx.nextInboundByteBuffer();
nextIn.writeBytes(in);
in.discardReadBytes();
}

View File

@ -1,5 +1,5 @@
package io.netty.channel;
public interface ChannelInboundHandlerContext<I> extends ChannelHandlerContext {
ChannelBufferHolder<I> in();
ChannelBufferHolder<I> inbound();
}

View File

@ -2,8 +2,6 @@ package io.netty.channel;
public interface ChannelInboundInvoker {
ChannelBufferHolder<Object> nextIn();
void fireChannelRegistered();
void fireChannelUnregistered();
void fireChannelActive();

View File

@ -14,7 +14,7 @@ public class ChannelInboundMessageHandlerAdapter<I> extends
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<I> ctx)
throws Exception {
Queue<I> in = ctx.in().messageBuffer();
Queue<I> in = ctx.inbound().messageBuffer();
for (;;) {
I msg = in.poll();
if (msg == null) {
@ -30,6 +30,6 @@ public class ChannelInboundMessageHandlerAdapter<I> extends
}
public void messageReceived(ChannelInboundHandlerContext<I> ctx, I msg) throws Exception {
ctx.nextIn().messageBuffer().add(msg);
ctx.nextInboundMessageBuffer().add(msg);
}
}

View File

@ -57,14 +57,14 @@ public abstract class ChannelOutboundHandlerAdapter<O> implements ChannelOutboun
}
static <O> void flush0(ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) {
if (ctx.prevOut().isBypass()) {
if (ctx.outbound().isBypass()) {
ctx.flush(future);
return;
}
if (ctx.prevOut().hasMessageBuffer()) {
Queue<O> out = ctx.prevOut().messageBuffer();
Queue<Object> nextOut = ctx.out().messageBuffer();
if (ctx.outbound().hasMessageBuffer()) {
Queue<O> out = ctx.outbound().messageBuffer();
Queue<Object> nextOut = ctx.nextOutboundMessageBuffer();
for (;;) {
O msg = out.poll();
if (msg == null) {
@ -73,8 +73,8 @@ public abstract class ChannelOutboundHandlerAdapter<O> implements ChannelOutboun
nextOut.add(msg);
}
} else {
ChannelBuffer out = ctx.prevOut().byteBuffer();
ChannelBuffer nextOut = ctx.out().byteBuffer();
ChannelBuffer out = ctx.outbound().byteBuffer();
ChannelBuffer nextOut = ctx.nextOutboundByteBuffer();
nextOut.writeBytes(out);
out.discardReadBytes();
}

View File

@ -2,5 +2,5 @@ package io.netty.channel;
public interface ChannelOutboundHandlerContext<O> extends ChannelHandlerContext {
ChannelBufferHolder<O> prevOut();
ChannelBufferHolder<O> outbound();
}

View File

@ -3,8 +3,6 @@ package io.netty.channel;
import java.net.SocketAddress;
public interface ChannelOutboundInvoker {
ChannelBufferHolder<Object> out();
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);

View File

@ -205,6 +205,9 @@ import java.util.NoSuchElementException;
*/
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker {
ChannelBufferHolder<Object> inbound();
ChannelBufferHolder<Object> outbound();
/**
* Inserts a {@link ChannelHandler} at the first position of this pipeline.
*

View File

@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
/**
* The default {@link ChannelPipeline} implementation. It is usually created
@ -570,19 +571,19 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public ChannelBufferHolder<Object> nextIn() {
public ChannelBufferHolder<Object> inbound() {
DefaultChannelHandlerContext ctx = firstInboundContext();
if (ctx != null) {
return ctx.in();
return ctx.inbound();
}
return null;
}
@Override
public ChannelBufferHolder<Object> out() {
public ChannelBufferHolder<Object> outbound() {
DefaultChannelHandlerContext ctx = firstOutboundContext();
if (ctx != null) {
return ctx.prevOut();
return ctx.outbound();
}
return channel().unsafe().out();
}
@ -903,11 +904,12 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
validateFuture(future);
if (out().hasMessageBuffer()) {
out().messageBuffer().add(message);
ChannelBufferHolder<Object> out = outbound();
if (out.hasMessageBuffer()) {
out.messageBuffer().add(message);
} else if (message instanceof ChannelBuffer) {
ChannelBuffer m = (ChannelBuffer) message;
out().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes());
out.byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes());
} else {
throw new IllegalArgumentException(
"cannot write a message whose type is not " +
@ -1057,7 +1059,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private final boolean canHandleInbound;
private final boolean canHandleOutbound;
private final ChannelBufferHolder<Object> in;
private final ChannelBufferHolder<Object> prevOut;
private final ChannelBufferHolder<Object> out;
@SuppressWarnings("unchecked")
DefaultChannelHandlerContext(
@ -1096,7 +1098,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
if (canHandleOutbound) {
try {
prevOut = ((ChannelOutboundHandler<Object>) handler).newOutboundBuffer(this);
out = ((ChannelOutboundHandler<Object>) handler).newOutboundBuffer(this);
} catch (Exception e) {
throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e);
} finally {
@ -1105,7 +1107,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
} else {
prevOut = null;
out = null;
}
}
@ -1140,32 +1142,82 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public ChannelBufferHolder<Object> in() {
public ChannelBufferHolder<Object> inbound() {
return in;
}
@Override
public ChannelBufferHolder<Object> prevOut() {
return prevOut;
public ChannelBufferHolder<Object> outbound() {
return out;
}
@Override
public ChannelBufferHolder<Object> nextIn() {
DefaultChannelHandlerContext next = nextInboundContext(this.next);
if (next != null) {
return next.in();
} else {
throw new NoSuchElementException("no inbound buffer in the rest of the pipeline");
public ChannelBuffer nextInboundByteBuffer() {
DefaultChannelHandlerContext ctx = this;
for (;;) {
ctx = nextInboundContext(ctx.next);
if (ctx == null) {
return null;
}
ChannelBufferHolder<Object> nextIn = ctx.inbound();
if (nextIn.hasByteBuffer()) {
return nextIn.byteBuffer();
}
}
}
@Override
public ChannelBufferHolder<Object> out() {
DefaultChannelHandlerContext next = nextOutboundContext(prev);
if (next != null) {
return next.prevOut();
} else {
return channel().unsafe().out();
public Queue<Object> nextInboundMessageBuffer() {
DefaultChannelHandlerContext ctx = this;
for (;;) {
ctx = nextInboundContext(ctx.next);
if (ctx == null) {
return null;
}
ChannelBufferHolder<Object> nextIn = ctx.inbound();
if (nextIn.hasMessageBuffer()) {
return nextIn.messageBuffer();
}
}
}
@Override
public ChannelBuffer nextOutboundByteBuffer() {
DefaultChannelHandlerContext ctx = this;
for (;;) {
ctx = nextOutboundContext(ctx.prev);
if (ctx == null) {
ChannelBufferHolder<Object> lastOut = channel().unsafe().out();
if (lastOut.hasByteBuffer()) {
return lastOut.byteBuffer();
} else {
return null;
}
}
ChannelBufferHolder<Object> nextOut = ctx.outbound();
if (nextOut.hasByteBuffer()) {
return nextOut.byteBuffer();
}
}
}
@Override
public Queue<Object> nextOutboundMessageBuffer() {
DefaultChannelHandlerContext ctx = this;
for (;;) {
ctx = nextOutboundContext(ctx.prev);
if (ctx == null) {
ChannelBufferHolder<Object> lastOut = channel().unsafe().out();
if (lastOut.hasMessageBuffer()) {
return lastOut.messageBuffer();
} else {
return null;
}
}
ChannelBufferHolder<Object> nextOut = ctx.outbound();
if (nextOut.hasMessageBuffer()) {
return nextOut.messageBuffer();
}
}
}
@ -1304,9 +1356,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
public ChannelFuture write(Object message, ChannelFuture future) {
if (message instanceof ChannelBuffer) {
ChannelBuffer m = (ChannelBuffer) message;
out().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes());
nextOutboundByteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes());
} else {
out().messageBuffer().add(message);
nextOutboundMessageBuffer().add(message);
}
return flush(future);
}

View File

@ -180,7 +180,7 @@ public class ServerChannelBootstrap {
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Channel> ctx) {
Queue<Channel> in = ctx.in().messageBuffer();
Queue<Channel> in = ctx.inbound().messageBuffer();
for (;;) {
Channel child = in.poll();
if (child == null) {

View File

@ -34,7 +34,7 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.nextIn();
final ChannelBufferHolder<Object> buf = pipeline.inbound();
boolean closed = false;
boolean read = false;
try {

View File

@ -36,7 +36,7 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.nextIn();
final ChannelBufferHolder<Object> buf = pipeline.inbound();
boolean closed = false;
boolean read = false;
try {

View File

@ -32,7 +32,7 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.nextIn();
final ChannelBufferHolder<Object> buf = pipeline.inbound();
boolean closed = false;
boolean read = false;
try {

View File

@ -33,7 +33,7 @@ abstract class AbstractOioStreamChannel extends AbstractOioChannel {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.nextIn();
final ChannelBufferHolder<Object> buf = pipeline.inbound();
boolean closed = false;
boolean read = false;
try {