Overhaul pipeline implementation for clarity and memory efficiency

This pull request cleans up our pipeline implementation by moving most
inter-context traversal code to DefaultChannelHandlerContext.
Previously, outbound traversal was done in DefaultChannelPipeline while
inbound traversal was done in DefaultChannelHandlerContext.

Also, to address the memory inefficiency issue raised in #920, all
runnables are lazily instantiated.
This commit is contained in:
Trustin Lee 2013-01-15 16:23:09 +09:00
parent 337f5bbb8e
commit 506474f569
2 changed files with 784 additions and 756 deletions

View File

@ -23,7 +23,6 @@ import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
@ -79,7 +78,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
throw new Error("unknown buffer type: " + channel.metadata().bufferType()); throw new Error("unknown buffer type: " + channel.metadata().bufferType());
} }
head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler); head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler, true);
head.next = tail; head.next = tail;
tail.prev = head; tail.prev = head;
@ -814,84 +813,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T> MessageBuf<T> outboundMessageBuffer() { public <T> MessageBuf<T> outboundMessageBuffer() {
return (MessageBuf<T>) findOutboundMessageBuffer(tail.prev); return (MessageBuf<T>) tail.nextOutboundMessageBuffer();
} }
@Override @Override
public ByteBuf outboundByteBuffer() { public ByteBuf outboundByteBuffer() {
return findOutboundByteBuffer(tail.prev); return tail.nextOutboundByteBuffer();
}
ByteBuf findOutboundByteBuffer(DefaultChannelHandlerContext ctx) {
final DefaultChannelHandlerContext initialCtx = ctx;
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
if (initialCtx != null && initialCtx.next != null) {
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s whose outbound buffer is %s.",
initialCtx.next.name(), ChannelOutboundHandler.class.getSimpleName(),
ByteBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the pipeline does not contain a %s whose outbound buffer is %s.",
ChannelOutboundHandler.class.getSimpleName(),
ByteBuf.class.getSimpleName()));
}
}
if (ctx.hasOutboundByteBuffer()) {
if (ctx.executor().inEventLoop(currentThread)) {
return ctx.outboundByteBuffer();
} else {
ByteBridge bridge = ctx.outByteBridge.get();
if (bridge == null) {
bridge = new ByteBridge(ctx);
if (!ctx.outByteBridge.compareAndSet(null, bridge)) {
bridge = ctx.outByteBridge.get();
}
}
return bridge.byteBuf;
}
}
ctx = ctx.prev;
}
}
MessageBuf<Object> findOutboundMessageBuffer(DefaultChannelHandlerContext ctx) {
final DefaultChannelHandlerContext initialCtx = ctx;
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
if (initialCtx.next != null) {
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s whose outbound buffer is %s.",
initialCtx.next.name(), ChannelOutboundHandler.class.getSimpleName(),
MessageBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the pipeline does not contain a %s whose outbound buffer is %s.",
ChannelOutboundHandler.class.getSimpleName(),
MessageBuf.class.getSimpleName()));
}
}
if (ctx.hasOutboundMessageBuffer()) {
if (ctx.executor().inEventLoop(currentThread)) {
return ctx.outboundMessageBuffer();
} else {
MessageBridge bridge = ctx.outMsgBridge.get();
if (bridge == null) {
bridge = new MessageBridge();
if (!ctx.outMsgBridge.compareAndSet(null, bridge)) {
bridge = ctx.outMsgBridge.get();
}
}
return bridge.msgBuf;
}
}
ctx = ctx.prev;
}
} }
@Override @Override
@ -905,7 +832,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
// Free all buffers if channel is closed and unregistered. // Free all buffers if channel is closed and unregistered.
if (!channel.isOpen()) { if (!channel.isOpen()) {
head.callFreeInboundBuffer(); head.invokeFreeInboundBuffer();
} }
} }
@ -956,436 +883,106 @@ final class DefaultChannelPipeline implements ChannelPipeline {
public void fireInboundBufferSuspended() { public void fireInboundBufferSuspended() {
head.fireInboundBufferSuspended(); head.fireInboundBufferSuspended();
if (channel.config().isAutoRead()) { if (channel.config().isAutoRead()) {
channel.read(); read();
} }
} }
@Override @Override
public ChannelFuture bind(SocketAddress localAddress) { public ChannelFuture bind(SocketAddress localAddress) {
return bind(localAddress, channel.newPromise()); return tail.bind(localAddress);
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress) { public ChannelFuture connect(SocketAddress remoteAddress) {
return connect(remoteAddress, channel.newPromise()); return tail.connect(remoteAddress);
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return connect(remoteAddress, localAddress, channel.newPromise()); return tail.connect(remoteAddress, localAddress);
} }
@Override @Override
public ChannelFuture disconnect() { public ChannelFuture disconnect() {
return disconnect(channel.newPromise()); return tail.disconnect();
} }
@Override @Override
public ChannelFuture close() { public ChannelFuture close() {
return close(channel.newPromise()); return tail.close();
} }
@Override @Override
public ChannelFuture deregister() { public ChannelFuture deregister() {
return deregister(channel.newPromise()); return tail.deregister();
} }
@Override @Override
public ChannelFuture flush() { public ChannelFuture flush() {
return flush(channel.newPromise()); return tail.flush();
} }
@Override @Override
public ChannelFuture write(Object message) { public ChannelFuture write(Object message) {
return write(message, channel.newPromise()); return tail.write(message);
} }
@Override @Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return bind(lastContext(FLAG_OPERATION_HANDLER), localAddress, promise); return tail.bind(localAddress, promise);
}
ChannelFuture bind(
final DefaultChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
validateFuture(promise);
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOperationHandler) ctx.handler()).bind(ctx, localAddress, promise);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
bind(ctx, localAddress, promise);
}
});
}
return promise;
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return connect(remoteAddress, null, promise); return tail.connect(remoteAddress, promise);
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return connect(lastContext(FLAG_OPERATION_HANDLER), remoteAddress, localAddress, promise); return tail.connect(remoteAddress, localAddress, promise);
}
ChannelFuture connect(
final DefaultChannelHandlerContext ctx, final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validateFuture(promise);
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOperationHandler) ctx.handler()).connect(ctx, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
connect(ctx, remoteAddress, localAddress, promise);
}
});
}
return promise;
} }
@Override @Override
public ChannelFuture disconnect(ChannelPromise promise) { public ChannelFuture disconnect(ChannelPromise promise) {
return disconnect(lastContext(FLAG_OPERATION_HANDLER), promise); return tail.disconnect(promise);
}
ChannelFuture disconnect(final DefaultChannelHandlerContext ctx, final ChannelPromise promise) {
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
// So far, UDP/IP is the only transport that has such behavior.
if (!ctx.channel().metadata().hasDisconnect()) {
return close(ctx, promise);
}
validateFuture(promise);
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOperationHandler) ctx.handler()).disconnect(ctx, promise);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
disconnect(ctx, promise);
}
});
}
return promise;
} }
@Override @Override
public ChannelFuture close(ChannelPromise promise) { public ChannelFuture close(ChannelPromise promise) {
return close(lastContext(FLAG_OPERATION_HANDLER), promise); return tail.close(promise);
}
ChannelFuture close(final DefaultChannelHandlerContext ctx, final ChannelPromise promise) {
validateFuture(promise);
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOperationHandler) ctx.handler()).close(ctx, promise);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
close(ctx, promise);
}
});
}
return promise;
} }
@Override @Override
public ChannelFuture deregister(final ChannelPromise promise) { public ChannelFuture deregister(final ChannelPromise promise) {
return deregister(lastContext(FLAG_OPERATION_HANDLER), promise); return tail.deregister(promise);
}
ChannelFuture deregister(final DefaultChannelHandlerContext ctx, final ChannelPromise promise) {
validateFuture(promise);
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOperationHandler) ctx.handler()).deregister(ctx, promise);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
deregister(ctx, promise);
}
});
}
return promise;
}
@Override
public ChannelFuture sendFile(FileRegion region) {
return sendFile(region, channel().newPromise());
}
@Override
public ChannelFuture sendFile(FileRegion region, ChannelPromise promise) {
return sendFile(lastContext(FLAG_OPERATION_HANDLER), region, promise);
}
ChannelFuture sendFile(final DefaultChannelHandlerContext ctx, final FileRegion region,
final ChannelPromise promise) {
if (region == null) {
throw new NullPointerException("region");
}
validateFuture(promise);
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
ctx.flushBridge();
((ChannelOperationHandler) ctx.handler()).sendFile(ctx, region, promise);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
sendFile(ctx, region, promise);
}
});
}
return promise;
} }
@Override @Override
public void read() { public void read() {
read(lastContext(FLAG_OPERATION_HANDLER)); tail.read();
}
void read(final DefaultChannelHandlerContext ctx) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
read0(ctx);
} else {
executor.execute(ctx.read0Task);
}
}
void read0(DefaultChannelHandlerContext ctx) {
try {
((ChannelOperationHandler) ctx.handler()).read(ctx);
} catch (Throwable t) {
notifyHandlerException(t);
}
} }
@Override @Override
public ChannelFuture flush(ChannelPromise promise) { public ChannelFuture flush(ChannelPromise promise) {
return flush(lastContext(FLAG_OPERATION_HANDLER), promise); return tail.flush(promise);
} }
ChannelFuture flush(final DefaultChannelHandlerContext ctx, final ChannelPromise promise) {
validateFuture(promise);
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
flush0(ctx, promise);
} else {
executor.execute(new Runnable() {
@Override @Override
public void run() { public ChannelFuture sendFile(FileRegion region) {
flush0(ctx, promise); return tail.sendFile(region);
}
});
} }
return promise; @Override
} public ChannelFuture sendFile(FileRegion region, ChannelPromise promise) {
return tail.sendFile(region, promise);
private void flush0(final DefaultChannelHandlerContext ctx, ChannelPromise promise) {
if (!channel.isRegistered() && !channel.isActive()) {
promise.setFailure(new ClosedChannelException());
return;
}
ChannelOperationHandler handler = (ChannelOperationHandler) ctx.handler();
try {
ctx.flushBridge();
handler.flush(ctx, promise);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
if (handler instanceof ChannelOutboundByteHandler) {
try {
((ChannelOutboundByteHandler) handler).discardOutboundReadBytes(ctx);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
}
} }
@Override @Override
public ChannelFuture write(Object message, ChannelPromise promise) { public ChannelFuture write(Object message, ChannelPromise promise) {
if (message instanceof FileRegion) { return tail.write(message, promise);
return sendFile((FileRegion) message, promise);
}
return write(tail.prev, message, promise);
} }
ChannelFuture write(DefaultChannelHandlerContext ctx, final Object message, final ChannelPromise promise) { void notifyHandlerException(Throwable cause) {
if (message == null) {
throw new NullPointerException("message");
}
validateFuture(promise);
final DefaultChannelHandlerContext initialCtx = ctx;
EventExecutor executor;
boolean msgBuf = false;
for (;;) {
if (ctx == null) {
if (initialCtx.next != null) {
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s which accepts a %s, and " +
"the transport does not accept it as-is.",
initialCtx.next.name(),
ChannelOutboundHandler.class.getSimpleName(),
message.getClass().getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the pipeline does not contain a %s which accepts a %s, and " +
"the transport does not accept it as-is.",
ChannelOutboundHandler.class.getSimpleName(),
message.getClass().getSimpleName()));
}
}
if (ctx.hasOutboundMessageBuffer()) {
msgBuf = true;
executor = ctx.executor();
break;
}
if (message instanceof ByteBuf && ctx.hasOutboundByteBuffer()) {
executor = ctx.executor();
break;
}
ctx = ctx.prev;
}
if (executor.inEventLoop()) {
write0(ctx, message, promise, msgBuf);
return promise;
}
final boolean msgBuf0 = msgBuf;
final DefaultChannelHandlerContext ctx0 = ctx;
executor.execute(new Runnable() {
@Override
public void run() {
write0(ctx0, message, promise, msgBuf0);
}
});
return promise;
}
private void write0(DefaultChannelHandlerContext ctx, Object message, ChannelPromise promise, boolean msgBuf) {
if (!channel.isRegistered() && !channel.isActive()) {
promise.setFailure(new ClosedChannelException());
return;
}
if (msgBuf) {
ctx.outboundMessageBuffer().add(message);
} else {
ByteBuf buf = (ByteBuf) message;
ctx.outboundByteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes());
}
flush0(ctx, promise);
}
private void validateFuture(ChannelFuture future) {
if (future == null) {
throw new NullPointerException("future");
}
if (future.channel() != channel) {
throw new IllegalArgumentException(String.format(
"future.channel does not match: %s (expected: %s)", future.channel(), channel));
}
if (future.isDone()) {
throw new IllegalArgumentException("future already done");
}
if (future instanceof ChannelFuture.Unsafe) {
throw new IllegalArgumentException("internal use only future not allowed");
}
}
DefaultChannelHandlerContext lastContext(int flag) {
return findContextOutbound(tail.prev, flag);
}
static DefaultChannelHandlerContext findContextInbound(DefaultChannelHandlerContext ctx, int flag) {
if (ctx == null) {
return null;
}
DefaultChannelHandlerContext realCtx = ctx;
while ((realCtx.flags & flag) == 0) {
realCtx = realCtx.next;
if (realCtx == null) {
return null;
}
}
return realCtx;
}
static DefaultChannelHandlerContext findContextOutbound(DefaultChannelHandlerContext ctx, int flag) {
if (ctx == null) {
return null;
}
DefaultChannelHandlerContext realCtx = ctx;
while ((realCtx.flags & flag) == 0) {
realCtx = realCtx.prev;
if (realCtx == null) {
return null;
}
}
return realCtx;
}
protected void notifyHandlerException(Throwable cause) {
if (!(cause instanceof ChannelPipelineException)) { if (!(cause instanceof ChannelPipelineException)) {
cause = new ChannelPipelineException(cause); cause = new ChannelPipelineException(cause);
} }