ChannelType -> ChannelBufferType / Optimize DefaultChannelPipeline

- Channel.type() -> bufferType()
- Use simpler bit mask operation for pipeline traversal
This commit is contained in:
Trustin Lee 2012-06-09 09:44:30 +09:00
parent 5661bff062
commit 24e1f936a8
15 changed files with 108 additions and 79 deletions

View File

@ -50,8 +50,8 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
}
@Override
public ChannelType type() {
return ChannelType.MESSAGE;
public ChannelBufferType bufferType() {
return ChannelBufferType.MESSAGE;
}
@Override

View File

@ -113,8 +113,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
*/
Integer id();
ChannelType type();
EventLoop eventLoop();
/**
@ -140,6 +138,8 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
boolean isRegistered();
boolean isActive();
ChannelBufferType bufferType();
ChannelBuffer outboundByteBuffer();
<T> Queue<T> outboundMessageBuffer();

View File

@ -21,6 +21,7 @@ import java.util.Queue;
public final class ChannelBufferHolder<E> {
private final ChannelBufferType type;
private final Queue<E> msgBuf;
private final ChannelBuffer byteBuf;
@ -30,6 +31,7 @@ public final class ChannelBufferHolder<E> {
}
this.msgBuf = msgBuf;
byteBuf = null;
type = ChannelBufferType.MESSAGE;
}
@ -39,14 +41,11 @@ public final class ChannelBufferHolder<E> {
}
msgBuf = null;
this.byteBuf = byteBuf;
type = ChannelBufferType.STREAM;
}
public boolean hasMessageBuffer() {
return msgBuf != null;
}
public boolean hasByteBuffer() {
return byteBuf != null;
public ChannelBufferType type() {
return type;
}
public Queue<E> messageBuffer() {
@ -65,26 +64,35 @@ public final class ChannelBufferHolder<E> {
@Override
public String toString() {
if (msgBuf != null) {
switch (type) {
case MESSAGE:
return "MessageBuffer(" + msgBuf.size() + ')';
} else {
case STREAM:
return byteBuf.toString();
default:
throw new Error();
}
}
public int size() {
if (msgBuf != null) {
switch (type) {
case MESSAGE:
return msgBuf.size();
} else {
case STREAM:
return byteBuf.readableBytes();
default:
throw new Error();
}
}
public boolean isEmpty() {
if (msgBuf != null) {
switch (type) {
case MESSAGE:
return msgBuf.isEmpty();
} else {
case STREAM:
return !byteBuf.readable();
default:
throw new Error();
}
}
}

View File

@ -15,7 +15,7 @@
*/
package io.netty.channel;
public enum ChannelType {
public enum ChannelBufferType {
STREAM,
MESSAGE;
}

View File

@ -17,8 +17,8 @@ package io.netty.channel;
public enum ChannelHandlerType {
STATE(0),
OPERATION(1),
INBOUND(0),
OPERATION(1),
OUTBOUND(1);
final int direction; // 0 - up (inbound), 1 - down (outbound)

View File

@ -79,7 +79,7 @@ import java.util.Queue;
* | /|\ . |
* | . . |
* | [ sendUpstream() ] [ sendDownstream() ] |
* | [ + INBOUND data ] [ + OUTBOUND data ] |
* | [ + VAL_INBOUND data ] [ + VAL_OUTBOUND data ] |
* | . . |
* | . \|/ |
* | +----------+-----------+ +-----------+------------+ |

View File

@ -34,13 +34,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private static final EnumSet<ChannelHandlerType> EMPTY_TYPE = EnumSet.noneOf(ChannelHandlerType.class);
static final int DIR_INBOUND = 0x00000001;
static final int DIR_OUTBOUND = 0x80000000;
volatile DefaultChannelHandlerContext next;
volatile DefaultChannelHandlerContext prev;
private final Channel channel;
private final DefaultChannelPipeline pipeline;
EventExecutor executor; // not thread-safe but OK because it never changes once set.
private final String name;
final Set<ChannelHandlerType> type;
private final Set<ChannelHandlerType> type;
final int directions;
private final ChannelHandler handler;
final Queue<Object> inMsgBuf;
@ -126,7 +130,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void run() {
DefaultChannelHandlerContext next = nextContext(
DefaultChannelHandlerContext.this.next, ChannelHandlerType.STATE);
DefaultChannelHandlerContext.this.next, DIR_INBOUND);
if (next != null) {
next.fillBridge();
EventExecutor executor = next.executor();
@ -153,20 +157,24 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
// Determine the type of the specified handler.
int typeValue = 0;
EnumSet<ChannelHandlerType> type = EMPTY_TYPE.clone();
if (handler instanceof ChannelStateHandler) {
type.add(ChannelHandlerType.STATE);
}
if (handler instanceof ChannelInboundHandler) {
type.add(ChannelHandlerType.INBOUND);
}
if (handler instanceof ChannelOutboundHandler) {
type.add(ChannelHandlerType.OUTBOUND);
typeValue |= DIR_INBOUND;
if (handler instanceof ChannelInboundHandler) {
type.add(ChannelHandlerType.INBOUND);
}
}
if (handler instanceof ChannelOperationHandler) {
type.add(ChannelHandlerType.OPERATION);
typeValue |= DIR_OUTBOUND;
if (handler instanceof ChannelOutboundHandler) {
type.add(ChannelHandlerType.OUTBOUND);
}
}
this.type = Collections.unmodifiableSet(type);
this.directions = typeValue;
this.prev = prev;
this.next = next;
@ -199,16 +207,21 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
throw new ChannelPipelineException("A user handler failed to create a new inbound buffer.", e);
}
if (holder.hasByteBuffer()) {
switch (holder.type()) {
case STREAM:
inByteBuf = holder.byteBuffer();
inByteBridge = new AtomicReference<StreamBridge>();
inMsgBuf = null;
inMsgBridge = null;
} else {
break;
case MESSAGE:
inByteBuf = null;
inByteBridge = null;
inMsgBuf = holder.messageBuffer();
inMsgBridge = new AtomicReference<MessageBridge>();
break;
default:
throw new Error();
}
} else {
inByteBuf = null;
@ -225,16 +238,21 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e);
}
if (holder.hasByteBuffer()) {
switch (holder.type()) {
case STREAM:
outByteBuf = holder.byteBuffer();
outByteBridge = new AtomicReference<StreamBridge>();
outMsgBuf = null;
outMsgBridge = null;
} else {
break;
case MESSAGE:
outByteBuf = null;
outByteBridge = null;
outMsgBuf = holder.messageBuffer();
outMsgBridge = new AtomicReference<MessageBridge>();
break;
default:
throw new Error();
}
} else {
outByteBuf = null;
@ -487,7 +505,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void fireChannelRegistered() {
DefaultChannelHandlerContext next = nextContext(this.next, ChannelHandlerType.STATE);
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
@ -500,7 +518,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void fireChannelUnregistered() {
DefaultChannelHandlerContext next = nextContext(this.next, ChannelHandlerType.STATE);
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
@ -513,7 +531,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void fireChannelActive() {
DefaultChannelHandlerContext next = nextContext(this.next, ChannelHandlerType.STATE);
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
@ -526,7 +544,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void fireChannelInactive() {
DefaultChannelHandlerContext next = nextContext(this.next, ChannelHandlerType.STATE);
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
@ -650,7 +668,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) {
return pipeline.bind(nextContext(prev, ChannelHandlerType.OPERATION), localAddress, future);
return pipeline.bind(nextContext(prev, DIR_OUTBOUND), localAddress, future);
}
@Override
@ -660,29 +678,29 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
return pipeline.connect(nextContext(prev, ChannelHandlerType.OPERATION), remoteAddress, localAddress, future);
return pipeline.connect(nextContext(prev, DIR_OUTBOUND), remoteAddress, localAddress, future);
}
@Override
public ChannelFuture disconnect(ChannelFuture future) {
return pipeline.disconnect(nextContext(prev, ChannelHandlerType.OPERATION), future);
return pipeline.disconnect(nextContext(prev, DIR_OUTBOUND), future);
}
@Override
public ChannelFuture close(ChannelFuture future) {
return pipeline.close(nextContext(prev, ChannelHandlerType.OPERATION), future);
return pipeline.close(nextContext(prev, DIR_OUTBOUND), future);
}
@Override
public ChannelFuture deregister(ChannelFuture future) {
return pipeline.deregister(nextContext(prev, ChannelHandlerType.OPERATION), future);
return pipeline.deregister(nextContext(prev, DIR_OUTBOUND), future);
}
@Override
public ChannelFuture flush(final ChannelFuture future) {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
DefaultChannelHandlerContext prev = nextContext(this.prev, ChannelHandlerType.OPERATION);
DefaultChannelHandlerContext prev = nextContext(this.prev, DIR_OUTBOUND);
prev.fillBridge();
pipeline.flush(prev, future);
} else {

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel;
import static io.netty.channel.DefaultChannelHandlerContext.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.DefaultChannelHandlerContext.MessageBridge;
import io.netty.channel.DefaultChannelHandlerContext.StreamBridge;
@ -883,7 +884,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public Queue<Object> inboundMessageBuffer() {
if (channel.type() != ChannelType.MESSAGE) {
if (channel.bufferType() != ChannelBufferType.MESSAGE) {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a message buffer.");
}
@ -892,7 +893,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelBuffer inboundByteBuffer() {
if (channel.type() != ChannelType.STREAM) {
if (channel.bufferType() != ChannelBufferType.STREAM) {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a byte buffer.");
}
@ -1075,7 +1076,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) {
return bind(firstContext(ChannelHandlerType.OPERATION), localAddress, future);
return bind(firstContext(DIR_OUTBOUND), localAddress, future);
}
ChannelFuture bind(
@ -1110,7 +1111,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
return connect(firstContext(ChannelHandlerType.OPERATION), remoteAddress, localAddress, future);
return connect(firstContext(DIR_OUTBOUND), remoteAddress, localAddress, future);
}
ChannelFuture connect(
@ -1142,7 +1143,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture disconnect(ChannelFuture future) {
return disconnect(firstContext(ChannelHandlerType.OPERATION), future);
return disconnect(firstContext(DIR_OUTBOUND), future);
}
ChannelFuture disconnect(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
@ -1168,7 +1169,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture close(ChannelFuture future) {
return close(firstContext(ChannelHandlerType.OPERATION), future);
return close(firstContext(DIR_OUTBOUND), future);
}
ChannelFuture close(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
@ -1194,7 +1195,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture deregister(final ChannelFuture future) {
return deregister(firstContext(ChannelHandlerType.OPERATION), future);
return deregister(firstContext(DIR_OUTBOUND), future);
}
ChannelFuture deregister(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
@ -1220,7 +1221,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture flush(ChannelFuture future) {
return flush(firstContext(ChannelHandlerType.OPERATION), future);
return flush(firstContext(DIR_OUTBOUND), future);
}
ChannelFuture flush(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
@ -1326,30 +1327,32 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
private DefaultChannelHandlerContext firstContext(ChannelHandlerType type) {
if (type.direction == 0) {
return nextContext(head.next, type);
private DefaultChannelHandlerContext firstContext(int direction) {
assert direction == DIR_INBOUND || direction == DIR_OUTBOUND;
if (direction > 0) {
return nextContext(head.next, direction);
} else {
return nextContext(tail, type);
return nextContext(tail, direction);
}
}
static DefaultChannelHandlerContext nextContext(
DefaultChannelHandlerContext ctx, ChannelHandlerType type) {
DefaultChannelHandlerContext ctx, int direction) {
assert direction == DIR_INBOUND || direction == DIR_OUTBOUND;
if (ctx == null) {
return null;
}
DefaultChannelHandlerContext realCtx = ctx;
if (type.direction == 0) {
while (!realCtx.type.contains(type)) {
if (direction > 0) {
while ((realCtx.directions & direction) == 0) {
realCtx = realCtx.next;
if (realCtx == null) {
return null;
}
}
} else {
while (!realCtx.type.contains(type)) {
while ((realCtx.directions & direction) == 0) {
realCtx = realCtx.prev;
if (realCtx == null) {
return null;
@ -1430,7 +1433,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private final class HeadHandler implements ChannelOutboundHandler {
@Override
public ChannelBufferHolder newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
switch (channel.type()) {
switch (channel.bufferType()) {
case STREAM:
return ChannelBufferHolders.byteBuffer();
case MESSAGE:

View File

@ -16,7 +16,7 @@
package io.netty.channel.embedded;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelType;
import io.netty.channel.ChannelBufferType;
import java.util.ArrayDeque;
import java.util.Queue;
@ -28,8 +28,8 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
}
@Override
public ChannelType type() {
return ChannelType.MESSAGE;
public ChannelBufferType bufferType() {
return ChannelBufferType.MESSAGE;
}
public Queue<Object> inboundBuffer() {

View File

@ -18,7 +18,7 @@ package io.netty.channel.embedded;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelType;
import io.netty.channel.ChannelBufferType;
public class EmbeddedStreamChannel extends AbstractEmbeddedChannel {
@ -27,8 +27,8 @@ public class EmbeddedStreamChannel extends AbstractEmbeddedChannel {
}
@Override
public ChannelType type() {
return ChannelType.STREAM;
public ChannelBufferType bufferType() {
return ChannelBufferType.STREAM;
}
public ChannelBuffer inboundBuffer() {

View File

@ -20,7 +20,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelType;
import io.netty.channel.ChannelBufferType;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop;
@ -67,8 +67,8 @@ public class LocalChannel extends AbstractChannel {
}
@Override
public ChannelType type() {
return ChannelType.MESSAGE;
public ChannelBufferType bufferType() {
return ChannelBufferType.MESSAGE;
}
@Override

View File

@ -17,7 +17,7 @@ package io.netty.channel.socket.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelType;
import io.netty.channel.ChannelBufferType;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
@ -31,8 +31,8 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel {
}
@Override
public ChannelType type() {
return ChannelType.MESSAGE;
public ChannelBufferType bufferType() {
return ChannelBufferType.MESSAGE;
}
@Override

View File

@ -18,7 +18,7 @@ package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelType;
import io.netty.channel.ChannelBufferType;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
@ -32,8 +32,8 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel {
}
@Override
public ChannelType type() {
return ChannelType.STREAM;
public ChannelBufferType bufferType() {
return ChannelBufferType.STREAM;
}
@Override

View File

@ -17,7 +17,7 @@ package io.netty.channel.socket.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelType;
import io.netty.channel.ChannelBufferType;
import java.io.IOException;
import java.util.Queue;
@ -29,8 +29,8 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel {
}
@Override
public ChannelType type() {
return ChannelType.MESSAGE;
public ChannelBufferType bufferType() {
return ChannelBufferType.MESSAGE;
}
@Override

View File

@ -18,7 +18,7 @@ package io.netty.channel.socket.oio;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelType;
import io.netty.channel.ChannelBufferType;
import java.io.IOException;
@ -29,8 +29,8 @@ abstract class AbstractOioStreamChannel extends AbstractOioChannel {
}
@Override
public ChannelType type() {
return ChannelType.STREAM;
public ChannelBufferType bufferType() {
return ChannelBufferType.STREAM;
}
@Override