From 01a5bd41f0209289e4117c538248bda7c0bc9aae Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sat, 2 Jun 2012 01:58:15 -0700 Subject: [PATCH] Add Channel.type() which tells if stream-oriented or message-oriented - DefaultChannelPipeline uses this information to reject invalid buffer access in inbound(Message|Byte)Buffer. Otherwise, a user can access a message buffer when the channel is stream-oriented. - Because ChannelType cannot be both STREAM and MESSAGE, catch-all buffer has been removed to avoid confusion and unexpected behavior (it's already causing headache.) - As a result, codec embedder needs rework. --- .../netty/channel/AbstractServerChannel.java | 7 ++++++- .../main/java/io/netty/channel/Channel.java | 2 ++ .../io/netty/channel/ChannelBufferHolder.java | 19 +------------------ .../netty/channel/ChannelBufferHolders.java | 19 +++++++++---------- .../java/io/netty/channel/ChannelType.java | 6 ++++++ .../netty/channel/DefaultChannelPipeline.java | 8 ++++++++ .../io/netty/channel/local/LocalChannel.java | 6 ++++++ .../socket/nio/AbstractNioMessageChannel.java | 6 ++++++ .../socket/nio/AbstractNioStreamChannel.java | 6 ++++++ .../socket/nio/NioServerSocketChannel.java | 3 ++- .../socket/oio/AbstractOioMessageChannel.java | 6 ++++++ .../socket/oio/AbstractOioStreamChannel.java | 6 ++++++ .../socket/oio/OioServerSocketChannel.java | 2 +- 13 files changed, 65 insertions(+), 31 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/ChannelType.java diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 0605021291..b82f988e3e 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -36,7 +36,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S * Creates a new instance. */ protected AbstractServerChannel(Integer id) { - super(null, id, ChannelBufferHolders.discardBuffer()); + super(null, id, ChannelBufferHolders.discardMessageBuffer()); } @Override @@ -49,6 +49,11 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S throw new NoSuchBufferException(); } + @Override + public ChannelType type() { + return ChannelType.MESSAGE; + } + @Override public SocketAddress remoteAddress() { return null; diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 0620dbad2f..3d407fee1d 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -113,6 +113,8 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu */ Integer id(); + ChannelType type(); + EventLoop eventLoop(); /** diff --git a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java index de603e952b..30bc36de2d 100644 --- a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java +++ b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java @@ -43,19 +43,6 @@ public final class ChannelBufferHolder { this.byteBuf = byteBuf; } - ChannelBufferHolder(Queue msgBuf, ChannelBuffer byteBuf) { - if (msgBuf == null) { - throw new NullPointerException("msgBuf"); - } - if (byteBuf == null) { - throw new NullPointerException("byteBuf"); - } - ctx = null; - bypassDirection = 0; - this.msgBuf = msgBuf; - this.byteBuf = byteBuf; - } - public boolean isBypass() { return bypassDirection != 0; } @@ -145,11 +132,7 @@ public final class ChannelBufferHolder { switch (bypassDirection) { case 0: if (msgBuf != null) { - if (byteBuf != null) { - return "CatchAllBuffer"; - } else { - return "MessageBuffer(" + msgBuf.size() + ')'; - } + return "MessageBuffer(" + msgBuf.size() + ')'; } else { return byteBuf.toString(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java b/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java index 4d5b549ce2..74b1c8eaf3 100644 --- a/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java +++ b/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java @@ -21,8 +21,10 @@ import java.util.Queue; public final class ChannelBufferHolders { - private static final ChannelBufferHolder DISCARD_BUFFER = - new ChannelBufferHolder(new NoopQueue(), new NoopByteBuf()); + private static final ChannelBufferHolder DISCARD_MESSAGE_BUFFER = + new ChannelBufferHolder(new NoopQueue()); + private static final ChannelBufferHolder DISCARD_BYTE_BUFFER = + new ChannelBufferHolder(new NoopByteBuf()); public static ChannelBufferHolder messageBuffer() { return messageBuffer(new ArrayDeque()); @@ -49,17 +51,14 @@ public final class ChannelBufferHolders { return new ChannelBufferHolder(ctx, false); } - public static ChannelBufferHolder catchAllBuffer() { - return catchAllBuffer(new ArrayDeque(), ChannelBuffers.dynamicBuffer()); - } - - public static ChannelBufferHolder catchAllBuffer(Queue msgBuf, ChannelBuffer byteBuf) { - return new ChannelBufferHolder(msgBuf, byteBuf); + @SuppressWarnings("unchecked") + public static ChannelBufferHolder discardMessageBuffer() { + return (ChannelBufferHolder) DISCARD_MESSAGE_BUFFER; } @SuppressWarnings("unchecked") - public static ChannelBufferHolder discardBuffer() { - return (ChannelBufferHolder) DISCARD_BUFFER; + public static ChannelBufferHolder discardByteBuffer() { + return (ChannelBufferHolder) DISCARD_BYTE_BUFFER; } private ChannelBufferHolders() { diff --git a/transport/src/main/java/io/netty/channel/ChannelType.java b/transport/src/main/java/io/netty/channel/ChannelType.java new file mode 100644 index 0000000000..98f2224025 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelType.java @@ -0,0 +1,6 @@ +package io.netty.channel; + +public enum ChannelType { + STREAM, + MESSAGE; +} diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 8d22b09969..dfc1e93597 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -625,11 +625,19 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public Queue inboundMessageBuffer() { + if (channel.type() != ChannelType.MESSAGE) { + throw new NoSuchBufferException( + "The first inbound buffer of this channel must be a message buffer."); + } return nextInboundMessageBuffer(head); } @Override public ChannelBuffer inboundByteBuffer() { + if (channel.type() != ChannelType.STREAM) { + throw new NoSuchBufferException( + "The first inbound buffer of this channel must be a byte buffer."); + } return nextInboundByteBuffer(head); } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 0880a38ce0..6979b6da75 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelType; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; import io.netty.channel.SingleThreadEventLoop; @@ -67,6 +68,11 @@ public class LocalChannel extends AbstractChannel { remoteAddress = peer.localAddress(); } + @Override + public ChannelType type() { + return ChannelType.MESSAGE; + } + @Override public ChannelConfig config() { return config; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java index 25ac0ac299..a4190d7298 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java @@ -3,6 +3,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.Channel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelType; import java.io.IOException; import java.nio.channels.SelectableChannel; @@ -16,6 +17,11 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel { super(parent, id, outboundBuffer, ch, defaultInterestOps); } + @Override + public ChannelType type() { + return ChannelType.MESSAGE; + } + @Override protected Unsafe newUnsafe() { return new NioMessageUnsafe(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java index c577438166..3f1935bd55 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java @@ -5,6 +5,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelType; import java.io.IOException; import java.nio.channels.SelectableChannel; @@ -17,6 +18,11 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel { super(parent, id, ChannelBufferHolders.byteBuffer(), ch, SelectionKey.OP_READ); } + @Override + public ChannelType type() { + return ChannelType.STREAM; + } + @Override protected Unsafe newUnsafe() { return new NioStreamUnsafe(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index 958cea0e9c..648eebe2ae 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -42,7 +42,8 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel private final ServerSocketChannelConfig config; public NioServerSocketChannel() { - super(null, null, ChannelBufferHolders.discardBuffer(), newSocket(), SelectionKey.OP_ACCEPT); + super(null, null, ChannelBufferHolders.discardMessageBuffer(), + newSocket(), SelectionKey.OP_ACCEPT); config = new DefaultServerSocketChannelConfig(javaChannel().socket()); } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java index 6ce7d41fc9..aa1ffd1395 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java @@ -3,6 +3,7 @@ package io.netty.channel.socket.oio; import io.netty.channel.Channel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelType; import java.io.IOException; import java.util.Queue; @@ -14,6 +15,11 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel { super(parent, id, outboundBuffer); } + @Override + public ChannelType type() { + return ChannelType.MESSAGE; + } + @Override protected Unsafe newUnsafe() { return new OioMessageUnsafe(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java index 411f087ab3..cf79d2cedf 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java @@ -5,6 +5,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelType; import java.io.IOException; @@ -14,6 +15,11 @@ abstract class AbstractOioStreamChannel extends AbstractOioChannel { super(parent, id, ChannelBufferHolders.byteBuffer()); } + @Override + public ChannelType type() { + return ChannelType.STREAM; + } + @Override protected Unsafe newUnsafe() { return new OioStreamUnsafe(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index 2252750bc2..7b16c529d5 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -60,7 +60,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel } public OioServerSocketChannel(Integer id, ServerSocket socket) { - super(null, id, ChannelBufferHolders.discardBuffer()); + super(null, id, ChannelBufferHolders.discardMessageBuffer()); if (socket == null) { throw new NullPointerException("socket"); }