Simplify DefaultChannelPipeline

This commit is contained in:
Trustin Lee 2013-01-09 19:13:43 +09:00
parent b742dcc209
commit b6fcf3acc4
3 changed files with 112 additions and 223 deletions

View File

@ -143,7 +143,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private final Runnable nextCtxFireInboundBufferUpdatedTask = new Runnable() {
@Override
public void run() {
DefaultChannelHandlerContext next = nextContext(
DefaultChannelHandlerContext next = findContextInbound(
DefaultChannelHandlerContext.this.next, FLAG_STATE_HANDLER);
if (next != null) {
next.fillBridge();
@ -188,7 +188,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
DefaultChannelHandlerContext nextCtx = nextContext(ctx.next, FLAG_STATE_HANDLER);
DefaultChannelHandlerContext nextCtx = findContextInbound(ctx.next, FLAG_STATE_HANDLER);
if (nextCtx != null) {
nextCtx.callFreeInboundBuffer();
} else {
@ -218,7 +218,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
DefaultChannelHandlerContext nextCtx = prevContext(ctx.prev, FLAG_OPERATION_HANDLER);
DefaultChannelHandlerContext nextCtx = findContextOutbound(ctx.prev, FLAG_OPERATION_HANDLER);
if (nextCtx != null) {
nextCtx.callFreeOutboundBuffer();
}
@ -941,17 +941,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ByteBuf nextOutboundByteBuffer() {
return pipeline.nextOutboundByteBuffer(prev);
return pipeline.findOutboundByteBuffer(prev);
}
@Override
public MessageBuf<Object> nextOutboundMessageBuffer() {
return pipeline.nextOutboundMessageBuffer(prev);
return pipeline.findOutboundMessageBuffer(prev);
}
@Override
public void fireChannelRegistered() {
DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER);
DefaultChannelHandlerContext next = findContextInbound(this.next, FLAG_STATE_HANDLER);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
@ -964,7 +964,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void fireChannelUnregistered() {
DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER);
DefaultChannelHandlerContext next = findContextInbound(this.next, FLAG_STATE_HANDLER);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop() && prev != null) {
@ -977,7 +977,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void fireChannelActive() {
DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER);
DefaultChannelHandlerContext next = findContextInbound(this.next, FLAG_STATE_HANDLER);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
@ -990,7 +990,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void fireChannelInactive() {
DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER);
DefaultChannelHandlerContext next = findContextInbound(this.next, FLAG_STATE_HANDLER);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop() && prev != null) {
@ -1085,7 +1085,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void fireInboundBufferSuspended() {
DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER);
DefaultChannelHandlerContext next = findContextInbound(this.next, FLAG_STATE_HANDLER);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop() && prev != null) {
@ -1138,7 +1138,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(prevContext(prev, FLAG_OPERATION_HANDLER), localAddress, promise);
return pipeline.bind(findContextOutbound(prev, FLAG_OPERATION_HANDLER), localAddress, promise);
}
@Override
@ -1148,34 +1148,35 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return pipeline.connect(prevContext(prev, FLAG_OPERATION_HANDLER), remoteAddress, localAddress, promise);
return pipeline.connect(
findContextOutbound(prev, FLAG_OPERATION_HANDLER), remoteAddress, localAddress, promise);
}
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
return pipeline.disconnect(prevContext(prev, FLAG_OPERATION_HANDLER), promise);
return pipeline.disconnect(findContextOutbound(prev, FLAG_OPERATION_HANDLER), promise);
}
@Override
public ChannelFuture close(ChannelPromise promise) {
return pipeline.close(prevContext(prev, FLAG_OPERATION_HANDLER), promise);
return pipeline.close(findContextOutbound(prev, FLAG_OPERATION_HANDLER), promise);
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
return pipeline.deregister(prevContext(prev, FLAG_OPERATION_HANDLER), promise);
return pipeline.deregister(findContextOutbound(prev, FLAG_OPERATION_HANDLER), promise);
}
@Override
public void read() {
pipeline.read(prevContext(prev, FLAG_OPERATION_HANDLER));
pipeline.read(findContextOutbound(prev, FLAG_OPERATION_HANDLER));
}
@Override
public ChannelFuture flush(final ChannelPromise promise) {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
DefaultChannelHandlerContext prev = prevContext(this.prev, FLAG_OPERATION_HANDLER);
DefaultChannelHandlerContext prev = findContextOutbound(this.prev, FLAG_OPERATION_HANDLER);
prev.fillBridge();
pipeline.flush(prev, promise);
} else {
@ -1310,11 +1311,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture sendFile(FileRegion region) {
return pipeline.sendFile(prevContext(prev, FLAG_OPERATION_HANDLER), region, newPromise());
return pipeline.sendFile(findContextOutbound(prev, FLAG_OPERATION_HANDLER), region, newPromise());
}
@Override
public ChannelFuture sendFile(FileRegion region, ChannelPromise promise) {
return pipeline.sendFile(prevContext(prev, FLAG_OPERATION_HANDLER), region, promise);
return pipeline.sendFile(findContextOutbound(prev, FLAG_OPERATION_HANDLER), region, promise);
}
}

View File

@ -47,8 +47,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private final Channel.Unsafe unsafe;
final DefaultChannelHandlerContext head;
private volatile DefaultChannelHandlerContext tail;
private final DefaultChannelHandlerContext tailCtx;
final DefaultChannelHandlerContext tail;
private final Map<String, DefaultChannelHandlerContext> name2ctx =
new HashMap<String, DefaultChannelHandlerContext>(4);
@ -66,7 +65,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
this.channel = channel;
tailCtx = new DefaultChannelHandlerContext(
tail = new DefaultChannelHandlerContext(
this, null, null, null, generateName(TAIL_HANDLER), TAIL_HANDLER);
HeadHandler headHandler;
@ -82,9 +81,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
head = new DefaultChannelHandlerContext(
this, null, null, tailCtx, generateName(headHandler), headHandler);
tailCtx.prev = head;
tail = tailCtx;
this, null, null, tail, generateName(headHandler), headHandler);
tail.prev = head;
unsafe = channel.unsafe();
}
@ -101,16 +99,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) {
final DefaultChannelHandlerContext nextCtx;
final DefaultChannelHandlerContext newCtx;
synchronized (this) {
checkDuplicateName(name);
nextCtx = head.next;
newCtx = new DefaultChannelHandlerContext(this, group, head, nextCtx, name, handler);
newCtx = new DefaultChannelHandlerContext(this, group, null, null, name, handler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
addFirst0(name, nextCtx, newCtx);
addFirst0(name, newCtx);
return this;
}
}
@ -123,7 +119,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addFirst0(name, nextCtx, newCtx);
addFirst0(name, newCtx);
}
}
});
@ -131,19 +127,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return this;
}
private void addFirst0(
final String name, DefaultChannelHandlerContext nextCtx, DefaultChannelHandlerContext newCtx) {
private void addFirst0(String name, DefaultChannelHandlerContext newCtx) {
callBeforeAdd(newCtx);
if (nextCtx != null) {
nextCtx.prev = newCtx;
}
if (head.next == tailCtx) {
tail = newCtx;
newCtx.next = tailCtx;
tailCtx.prev = newCtx;
}
DefaultChannelHandlerContext nextCtx = head.next;
head.next = newCtx;
newCtx.prev = head;
newCtx.next = nextCtx;
nextCtx.prev = newCtx;
name2ctx.put(name, newCtx);
@ -157,16 +148,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
final DefaultChannelHandlerContext oldTail;
final DefaultChannelHandlerContext newTail;
final DefaultChannelHandlerContext newCtx;
synchronized (this) {
checkDuplicateName(name);
oldTail = tail;
newTail = new DefaultChannelHandlerContext(this, group, null, null, name, handler);
if (!newTail.channel().isRegistered() || newTail.executor().inEventLoop()) {
addLast0(name, oldTail, newTail);
newCtx = new DefaultChannelHandlerContext(this, group, null, null, name, handler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
addLast0(name, newCtx);
return this;
}
}
@ -174,12 +163,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
// Run the following 'waiting' code outside of the above synchronized block
// in order to avoid deadlock
newTail.executeOnEventLoop(new Runnable() {
newCtx.executeOnEventLoop(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addLast0(name, oldTail, newTail);
addLast0(name, newCtx);
}
}
});
@ -188,28 +177,18 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
private void addLast0(
final String name, DefaultChannelHandlerContext oldTail, DefaultChannelHandlerContext newTail) {
callBeforeAdd(newTail);
final String name, DefaultChannelHandlerContext newCtx) {
callBeforeAdd(newCtx);
DefaultChannelHandlerContext prev = oldTail.prev;
if (oldTail == tailCtx) {
// This is the first handler added
tailCtx.prev = newTail;
newTail.next = tailCtx;
prev.next = newTail;
newTail.prev = prev;
} else {
oldTail.next = newTail;
newTail.prev = oldTail;
DefaultChannelHandlerContext prev = tail.prev;
prev.next = newCtx;
newCtx.prev = prev;
newCtx.next = tail;
tail.prev = newCtx;
prev.next = oldTail;
oldTail.prev = prev;
}
name2ctx.put(name, newCtx);
tail = newTail;
name2ctx.put(name, newTail);
callAfterAdd(newTail);
callAfterAdd(newCtx);
}
@Override
@ -273,11 +252,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
synchronized (this) {
ctx = getContextOrDie(baseName);
if (ctx == tail) {
return addLast(name, handler);
}
checkDuplicateName(name);
newCtx = new DefaultChannelHandlerContext(this, group, ctx, ctx.next, name, handler);
newCtx = new DefaultChannelHandlerContext(this, group, null, null, name, handler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
addAfter0(name, ctx, newCtx);
@ -306,8 +282,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
callBeforeAdd(newCtx);
newCtx.prev = ctx;
newCtx.next = ctx.next;
ctx.next.prev = newCtx;
ctx.next = newCtx;
name2ctx.put(name, newCtx);
callAfterAdd(newCtx);
@ -391,38 +370,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
private DefaultChannelHandlerContext remove(final DefaultChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
DefaultChannelHandlerContext context;
Future<?> future;
synchronized (this) {
if (ctx == tailCtx) {
throw new NoSuchElementException();
}
if (head == tail) {
return null;
} else if (ctx == head) {
throw new Error(); // Should never happen.
} else if (ctx == tail) {
if (tail == tailCtx) {
throw new NoSuchElementException();
}
final DefaultChannelHandlerContext oldTail = tail;
if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) {
removeLast0(oldTail);
return oldTail;
} else {
future = oldTail.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
removeLast0(oldTail);
}
}
});
context = oldTail;
}
} else {
if (!ctx.channel().isRegistered() || ctx.executor().inEventLoop()) {
remove0(ctx);
return ctx;
@ -438,7 +391,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
context = ctx;
}
}
}
// Run the following 'waiting' code outside of the above synchronized block
// in order to avoid deadlock
@ -462,7 +414,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelHandler removeFirst() {
if (head.next == tailCtx) {
if (head.next == tail) {
throw new NoSuchElementException();
}
return remove(head.next).handler();
@ -470,44 +422,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelHandler removeLast() {
final DefaultChannelHandlerContext oldTail;
synchronized (this) {
if (tail == tailCtx) {
if (head.next == tail) {
throw new NoSuchElementException();
}
oldTail = tail;
if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) {
removeLast0(oldTail);
return oldTail.handler();
}
}
// Run the following 'waiting' code outside of the above synchronized block
// in order to avoid deadlock
oldTail.executeOnEventLoop(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
removeLast0(oldTail);
}
}
});
return oldTail.handler();
}
private void removeLast0(DefaultChannelHandlerContext oldTail) {
callBeforeRemove(oldTail);
tailCtx.prev = oldTail.prev;
oldTail.prev.next = tailCtx;
tail = oldTail.prev;
name2ctx.remove(oldTail.name());
callBeforeRemove(oldTail);
return remove(tail.prev).handler();
}
@Override
@ -530,49 +448,18 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private ChannelHandler replace(
final DefaultChannelHandlerContext ctx, final String newName, ChannelHandler newHandler) {
assert ctx != head && ctx != tail;
Future<?> future;
synchronized (this) {
if (ctx == tailCtx) {
throw new NoSuchElementException();
}
if (ctx == head) {
throw new IllegalArgumentException();
} else if (ctx == tail) {
if (tail == tailCtx) {
throw new NoSuchElementException();
}
final DefaultChannelHandlerContext oldTail = tail;
final DefaultChannelHandlerContext newTail =
new DefaultChannelHandlerContext(this, null, oldTail, null, newName, newHandler);
if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) {
removeLast0(oldTail);
checkDuplicateName(newName);
addLast0(newName, tail, newTail);
return ctx.handler();
} else {
future = oldTail.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
removeLast0(oldTail);
checkDuplicateName(newName);
addLast0(newName, tail, newTail);
}
}
});
}
} else {
boolean sameName = ctx.name().equals(newName);
if (!sameName) {
checkDuplicateName(newName);
}
DefaultChannelHandlerContext prev = ctx.prev;
DefaultChannelHandlerContext next = ctx.next;
final DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, ctx.executor, prev, next, newName, newHandler);
new DefaultChannelHandlerContext(this, ctx.executor, null, null, newName, newHandler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
replace0(ctx, newName, newCtx);
@ -588,7 +475,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
});
}
}
}
// Run the following 'waiting' code outside of the above synchronized block
// in order to avoid deadlock
@ -607,6 +493,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
callBeforeRemove(ctx);
callBeforeAdd(newCtx);
newCtx.prev = prev;
newCtx.next = next;
prev.next = newCtx;
next.prev = newCtx;
@ -729,8 +617,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelHandler last() {
DefaultChannelHandlerContext last = tail;
if (last == tailCtx || last == null) {
DefaultChannelHandlerContext last = tail.prev;
if (last == head) {
return null;
}
return last.handler();
@ -738,8 +626,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelHandlerContext lastContext() {
DefaultChannelHandlerContext last = tail;
if (last == head || last == null) {
DefaultChannelHandlerContext last = tail.prev;
if (last == head) {
return null;
}
return last;
@ -834,7 +722,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
DefaultChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null || ctx == tailCtx) {
if (ctx == tail) {
return map;
}
map.put(ctx.name(), ctx.handler());
@ -863,7 +751,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
buf.append(')');
ctx = ctx.next;
if (ctx == null) {
if (ctx == tail) {
break;
}
@ -887,15 +775,15 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
@SuppressWarnings("unchecked")
public <T> MessageBuf<T> outboundMessageBuffer() {
return (MessageBuf<T>) nextOutboundMessageBuffer(tail);
return (MessageBuf<T>) findOutboundMessageBuffer(tail.prev);
}
@Override
public ByteBuf outboundByteBuffer() {
return nextOutboundByteBuffer(tail);
return findOutboundByteBuffer(tail.prev);
}
ByteBuf nextOutboundByteBuffer(DefaultChannelHandlerContext ctx) {
ByteBuf findOutboundByteBuffer(DefaultChannelHandlerContext ctx) {
final DefaultChannelHandlerContext initialCtx = ctx;
final Thread currentThread = Thread.currentThread();
for (;;) {
@ -931,7 +819,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
}
MessageBuf<Object> nextOutboundMessageBuffer(DefaultChannelHandlerContext ctx) {
MessageBuf<Object> findOutboundMessageBuffer(DefaultChannelHandlerContext ctx) {
final DefaultChannelHandlerContext initialCtx = ctx;
final Thread currentThread = Thread.currentThread();
for (;;) {
@ -1333,7 +1221,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
if (message instanceof FileRegion) {
return sendFile((FileRegion) message, promise);
}
return write(tail, message, promise);
return write(tail.prev, message, promise);
}
ChannelFuture write(DefaultChannelHandlerContext ctx, final Object message, final ChannelPromise promise) {
@ -1425,10 +1313,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
DefaultChannelHandlerContext lastContext(int flag) {
return prevContext(tail, flag);
return findContextOutbound(tail.prev, flag);
}
static DefaultChannelHandlerContext nextContext(DefaultChannelHandlerContext ctx, int flag) {
static DefaultChannelHandlerContext findContextInbound(DefaultChannelHandlerContext ctx, int flag) {
if (ctx == null) {
return null;
}
@ -1443,7 +1331,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return realCtx;
}
static DefaultChannelHandlerContext prevContext(DefaultChannelHandlerContext ctx, int flag) {
static DefaultChannelHandlerContext findContextOutbound(DefaultChannelHandlerContext ctx, int flag) {
if (ctx == null) {
return null;
}

View File

@ -214,7 +214,7 @@ public class DefaultChannelPipelineTest {
private static void verifyContextNumber(DefaultChannelPipeline pipeline, int expectedNumber) {
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext();
int handlerNumber = 0;
while (ctx != null) {
while (ctx != pipeline.tail) {
handlerNumber++;
ctx = ctx.next;
}