This commit is contained in:
Norman Maurer 2019-11-29 08:59:47 +01:00
parent ee593ace33
commit 0bc7b48deb
2 changed files with 133 additions and 40 deletions

View File

@ -76,6 +76,11 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
this.handler = handler;
}
void unlink() {
prev = null;
next = null;
}
private Tasks invokeTasks() {
Tasks tasks = invokeTasks;
if (tasks == null) {
@ -796,6 +801,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private DefaultChannelHandlerContext findContextInbound(int mask) {
DefaultChannelHandlerContext ctx = this;
if (ctx.next == null) {
assert handlerState == REMOVE_COMPLETE;
return pipeline.empty;
}
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
@ -804,6 +813,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private DefaultChannelHandlerContext findContextOutbound(int mask) {
DefaultChannelHandlerContext ctx = this;
if (ctx.prev == null) {
assert handlerState == REMOVE_COMPLETE;
return pipeline.empty;
}
do {
ctx = ctx.prev;
} while ((ctx.executionMask & mask) == 0);
@ -925,7 +938,8 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
}
}
protected abstract DefaultChannelHandlerContext findContext(DefaultChannelHandlerContext ctx);
protected abstract DefaultChannelHandlerContext findContext(
DefaultChannelHandlerContext ctx);
@Override
public final void run() {
try {
@ -966,13 +980,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(
new ObjectPool.ObjectCreator<WriteTask>() {
@Override
public WriteTask newObject(ObjectPool.Handle<WriteTask> handle) {
return new WriteTask(handle);
}
});
private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(WriteTask::new);
static WriteTask newInstance(
DefaultChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
@ -993,13 +1001,8 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
static final class WriteAndFlushTask extends AbstractWriteTask {
private static final ObjectPool<WriteAndFlushTask> RECYCLER = ObjectPool.newPool(
new ObjectPool.ObjectCreator<WriteAndFlushTask>() {
@Override
public WriteAndFlushTask newObject(ObjectPool.Handle<WriteAndFlushTask> handle) {
return new WriteAndFlushTask(handle);
}
});
private static final ObjectPool<WriteAndFlushTask> RECYCLER = ObjectPool.newPool(WriteAndFlushTask::new);
static WriteAndFlushTask newInstance(
DefaultChannelHandlerContext ctx, Object msg, ChannelPromise promise) {

View File

@ -48,13 +48,16 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
private static final String HEAD_NAME = generateName0(HeadHandler.class);
private static final String TAIL_NAME = generateName0(TailHandler.class);
private static final String EMPTY_NAME = generateName0(EmptyHandler.class);
private static final ChannelHandler HEAD_HANDLER = new HeadHandler();
private static final ChannelHandler TAIL_HANDLER = new TailHandler();
private static final ChannelHandler EMPTY_HANDLER = new EmptyHandler();
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() throws Exception {
protected Map<Class<?>, String> initialValue() {
return new WeakHashMap<>();
}
};
@ -64,6 +67,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
private final DefaultChannelHandlerContext head;
private final DefaultChannelHandlerContext tail;
final DefaultChannelHandlerContext empty;
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
@ -76,10 +80,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
this.channel = requireNonNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, channel.eventLoop());
voidPromise = new VoidChannelPromise(channel, true);
empty = new DefaultChannelHandlerContext(this, EMPTY_NAME, EMPTY_HANDLER);
tail = new DefaultChannelHandlerContext(this, TAIL_NAME, TAIL_HANDLER);
head = new DefaultChannelHandlerContext(this, HEAD_NAME, HEAD_HANDLER);
head.next = tail;
tail.prev = head;
head.setAddComplete();
@ -496,7 +499,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return (T) ctx.handler();
}
private void unlink(DefaultChannelHandlerContext ctx) {
private void relink(DefaultChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
DefaultChannelHandlerContext prev = ctx.prev;
DefaultChannelHandlerContext next = ctx.next;
@ -505,8 +508,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
private void remove0(DefaultChannelHandlerContext ctx) {
unlink(ctx);
relink(ctx);
callHandlerRemoved0(ctx);
ctx.unlink();
}
@Override
@ -570,27 +574,31 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
private void replace0(DefaultChannelHandlerContext oldCtx, DefaultChannelHandlerContext newCtx) {
DefaultChannelHandlerContext prev = oldCtx.prev;
DefaultChannelHandlerContext next = oldCtx.next;
newCtx.prev = prev;
newCtx.next = next;
try {
DefaultChannelHandlerContext prev = oldCtx.prev;
DefaultChannelHandlerContext next = oldCtx.next;
newCtx.prev = prev;
newCtx.next = next;
// Finish the replacement of oldCtx with newCtx in the linked list.
// Note that this doesn't mean events will be sent to the new handler immediately
// because we are currently at the event handler thread and no more than one handler methods can be invoked
// at the same time (we ensured that in replace().)
prev.next = newCtx;
next.prev = newCtx;
// Finish the replacement of oldCtx with newCtx in the linked list.
// Note that this doesn't mean events will be sent to the new handler immediately
// because we are currently at the event handler thread and no more than one handler methods can be invoked
// at the same time (we ensured that in replace().)
prev.next = newCtx;
next.prev = newCtx;
// update the reference to the replacement so forward of buffered content will work correctly
oldCtx.prev = newCtx;
oldCtx.next = newCtx;
// update the reference to the replacement so forward of buffered content will work correctly
oldCtx.prev = newCtx;
oldCtx.next = newCtx;
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
// event handlers must be called after handlerAdded().
callHandlerAdded0(newCtx);
callHandlerRemoved0(oldCtx);
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
// event handlers must be called after handlerAdded().
callHandlerAdded0(newCtx);
callHandlerRemoved0(oldCtx);
} finally {
oldCtx.unlink();
}
}
private static void checkMultiplicity(ChannelHandler handler) {
@ -615,7 +623,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
handlers.remove(ctx);
}
unlink(ctx);
relink(ctx);
ctx.callHandlerRemoved();
removed = true;
@ -623,6 +631,8 @@ public class DefaultChannelPipeline implements ChannelPipeline {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
} finally {
ctx.unlink();
}
if (removed) {
@ -833,9 +843,10 @@ public class DefaultChannelPipeline implements ChannelPipeline {
synchronized (handlers) {
handlers.remove(ctx);
}
DefaultChannelHandlerContext prev = ctx.prev;
remove0(ctx);
ctx = ctx.prev;
ctx = prev;
}
}
@ -1207,4 +1218,83 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
}
private static final class EmptyHandler implements ChannelHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx) { }
@Override
public void channelActive(ChannelHandlerContext ctx) { }
@Override
public void channelInactive(ChannelHandlerContext ctx) { }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
ReferenceCountUtil.release(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) { }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { }
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already"));
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already"));
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already"));
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already"));
}
@Override
public void register(ChannelHandlerContext ctx, ChannelPromise promise) {
promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already"));
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already"));
}
@Override
public void read(ChannelHandlerContext ctx) { }
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg);
promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already"));
}
@Override
public void flush(ChannelHandlerContext ctx) { }
}
}