Add Channel.type() which tells if stream-oriented or message-oriented

- DefaultChannelPipeline uses this information to reject invalid buffer
  access in inbound(Message|Byte)Buffer.  Otherwise, a user can access
  a message buffer when the channel is stream-oriented.
- Because ChannelType cannot be both STREAM and MESSAGE, catch-all
  buffer has been removed to avoid confusion and unexpected behavior
  (it's already causing headache.)
- As a result, codec embedder needs rework.
This commit is contained in:
Trustin Lee 2012-06-02 01:58:15 -07:00
parent 45f19d02ff
commit 01a5bd41f0
13 changed files with 65 additions and 31 deletions

View File

@ -36,7 +36,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
* Creates a new instance.
*/
protected AbstractServerChannel(Integer id) {
super(null, id, ChannelBufferHolders.discardBuffer());
super(null, id, ChannelBufferHolders.discardMessageBuffer());
}
@Override
@ -49,6 +49,11 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
throw new NoSuchBufferException();
}
@Override
public ChannelType type() {
return ChannelType.MESSAGE;
}
@Override
public SocketAddress remoteAddress() {
return null;

View File

@ -113,6 +113,8 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
*/
Integer id();
ChannelType type();
EventLoop eventLoop();
/**

View File

@ -43,19 +43,6 @@ public final class ChannelBufferHolder<E> {
this.byteBuf = byteBuf;
}
ChannelBufferHolder(Queue<E> msgBuf, ChannelBuffer byteBuf) {
if (msgBuf == null) {
throw new NullPointerException("msgBuf");
}
if (byteBuf == null) {
throw new NullPointerException("byteBuf");
}
ctx = null;
bypassDirection = 0;
this.msgBuf = msgBuf;
this.byteBuf = byteBuf;
}
public boolean isBypass() {
return bypassDirection != 0;
}
@ -145,11 +132,7 @@ public final class ChannelBufferHolder<E> {
switch (bypassDirection) {
case 0:
if (msgBuf != null) {
if (byteBuf != null) {
return "CatchAllBuffer";
} else {
return "MessageBuffer(" + msgBuf.size() + ')';
}
return "MessageBuffer(" + msgBuf.size() + ')';
} else {
return byteBuf.toString();
}

View File

@ -21,8 +21,10 @@ import java.util.Queue;
public final class ChannelBufferHolders {
private static final ChannelBufferHolder<Object> DISCARD_BUFFER =
new ChannelBufferHolder<Object>(new NoopQueue<Object>(), new NoopByteBuf());
private static final ChannelBufferHolder<Object> DISCARD_MESSAGE_BUFFER =
new ChannelBufferHolder<Object>(new NoopQueue<Object>());
private static final ChannelBufferHolder<Byte> DISCARD_BYTE_BUFFER =
new ChannelBufferHolder<Byte>(new NoopByteBuf());
public static <E> ChannelBufferHolder<E> messageBuffer() {
return messageBuffer(new ArrayDeque<E>());
@ -49,17 +51,14 @@ public final class ChannelBufferHolders {
return new ChannelBufferHolder<E>(ctx, false);
}
public static <E> ChannelBufferHolder<E> catchAllBuffer() {
return catchAllBuffer(new ArrayDeque<E>(), ChannelBuffers.dynamicBuffer());
}
public static <E> ChannelBufferHolder<E> catchAllBuffer(Queue<E> msgBuf, ChannelBuffer byteBuf) {
return new ChannelBufferHolder<E>(msgBuf, byteBuf);
@SuppressWarnings("unchecked")
public static <E> ChannelBufferHolder<E> discardMessageBuffer() {
return (ChannelBufferHolder<E>) DISCARD_MESSAGE_BUFFER;
}
@SuppressWarnings("unchecked")
public static <E> ChannelBufferHolder<E> discardBuffer() {
return (ChannelBufferHolder<E>) DISCARD_BUFFER;
public static <E> ChannelBufferHolder<E> discardByteBuffer() {
return (ChannelBufferHolder<E>) DISCARD_BYTE_BUFFER;
}
private ChannelBufferHolders() {

View File

@ -0,0 +1,6 @@
package io.netty.channel;
public enum ChannelType {
STREAM,
MESSAGE;
}

View File

@ -625,11 +625,19 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public Queue<Object> inboundMessageBuffer() {
if (channel.type() != ChannelType.MESSAGE) {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a message buffer.");
}
return nextInboundMessageBuffer(head);
}
@Override
public ChannelBuffer inboundByteBuffer() {
if (channel.type() != ChannelType.STREAM) {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a byte buffer.");
}
return nextInboundByteBuffer(head);
}

View File

@ -22,6 +22,7 @@ import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelType;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop;
@ -67,6 +68,11 @@ public class LocalChannel extends AbstractChannel {
remoteAddress = peer.localAddress();
}
@Override
public ChannelType type() {
return ChannelType.MESSAGE;
}
@Override
public ChannelConfig config() {
return config;

View File

@ -3,6 +3,7 @@ package io.netty.channel.socket.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelType;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
@ -16,6 +17,11 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel {
super(parent, id, outboundBuffer, ch, defaultInterestOps);
}
@Override
public ChannelType type() {
return ChannelType.MESSAGE;
}
@Override
protected Unsafe newUnsafe() {
return new NioMessageUnsafe();

View File

@ -5,6 +5,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelType;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
@ -17,6 +18,11 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel {
super(parent, id, ChannelBufferHolders.byteBuffer(), ch, SelectionKey.OP_READ);
}
@Override
public ChannelType type() {
return ChannelType.STREAM;
}
@Override
protected Unsafe newUnsafe() {
return new NioStreamUnsafe();

View File

@ -42,7 +42,8 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
private final ServerSocketChannelConfig config;
public NioServerSocketChannel() {
super(null, null, ChannelBufferHolders.discardBuffer(), newSocket(), SelectionKey.OP_ACCEPT);
super(null, null, ChannelBufferHolders.discardMessageBuffer(),
newSocket(), SelectionKey.OP_ACCEPT);
config = new DefaultServerSocketChannelConfig(javaChannel().socket());
}

View File

@ -3,6 +3,7 @@ package io.netty.channel.socket.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelType;
import java.io.IOException;
import java.util.Queue;
@ -14,6 +15,11 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel {
super(parent, id, outboundBuffer);
}
@Override
public ChannelType type() {
return ChannelType.MESSAGE;
}
@Override
protected Unsafe newUnsafe() {
return new OioMessageUnsafe();

View File

@ -5,6 +5,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelType;
import java.io.IOException;
@ -14,6 +15,11 @@ abstract class AbstractOioStreamChannel extends AbstractOioChannel {
super(parent, id, ChannelBufferHolders.byteBuffer());
}
@Override
public ChannelType type() {
return ChannelType.STREAM;
}
@Override
protected Unsafe newUnsafe() {
return new OioStreamUnsafe();

View File

@ -60,7 +60,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
}
public OioServerSocketChannel(Integer id, ServerSocket socket) {
super(null, id, ChannelBufferHolders.discardBuffer());
super(null, id, ChannelBufferHolders.discardMessageBuffer());
if (socket == null) {
throw new NullPointerException("socket");
}