Fix a race condition where handler is removed before unregistration
Related: #3156 Motivation: Let's say we have a channel with the following pipeline configuration: HEAD --> [E1] H1 --> [E2] H2 --> TAIL when the channel is deregistered, the channelUnregistered() methods of H1 and H2 will be invoked from the executor thread of E1 and E2 respectively. To ensure that the channelUnregistered() methods are invoked from the correct thread, new one-time tasks will be created accordingly and be scheduled via Executor.execute(Runnable). As soon as the one-time tasks are scheduled, DefaultChannelPipeline.fireChannelUnregistered() will start to remove all handlers from the pipeline via teardownAll(). This process is performed in reversed order of event propagation. i.e. H2 is removed first, and then H1 is removed. If the channelUnregistered() event has been passed to H2 before H2 is removed, a user does not see any problem. If H2 has been removed before channelUnregistered() event is passed to H2, a user will often see the following confusing warning message: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. Modifications: To ensure that the handlers are removed *after* all events are propagated, traverse the pipeline in ascending order before performing the actual removal. Result: A user does not get the confusing warning message anymore.
This commit is contained in:
parent
53c1a30194
commit
c774b65f86
@ -15,8 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.channel;
|
package io.netty.channel;
|
||||||
|
|
||||||
import static io.netty.channel.DefaultChannelPipeline.logger;
|
|
||||||
|
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.util.DefaultAttributeMap;
|
import io.netty.util.DefaultAttributeMap;
|
||||||
import io.netty.util.Recycler;
|
import io.netty.util.Recycler;
|
||||||
@ -29,6 +27,8 @@ import io.netty.util.internal.StringUtil;
|
|||||||
|
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
|
import static io.netty.channel.DefaultChannelPipeline.*;
|
||||||
|
|
||||||
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
|
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
|
||||||
|
|
||||||
volatile AbstractChannelHandlerContext next;
|
volatile AbstractChannelHandlerContext next;
|
||||||
@ -82,31 +82,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
this.outbound = outbound;
|
this.outbound = outbound;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Invocation initiated by {@link DefaultChannelPipeline#teardownAll()}}. */
|
|
||||||
void teardown() {
|
|
||||||
EventExecutor executor = executor();
|
|
||||||
if (executor.inEventLoop()) {
|
|
||||||
teardown0();
|
|
||||||
} else {
|
|
||||||
executor.execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
teardown0();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void teardown0() {
|
|
||||||
AbstractChannelHandlerContext prev = this.prev;
|
|
||||||
if (prev != null) {
|
|
||||||
synchronized (pipeline) {
|
|
||||||
pipeline.remove0(this);
|
|
||||||
}
|
|
||||||
prev.teardown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Channel channel() {
|
public Channel channel() {
|
||||||
return channel;
|
return channel;
|
||||||
|
@ -19,6 +19,7 @@ import io.netty.channel.Channel.Unsafe;
|
|||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.concurrent.EventExecutor;
|
import io.netty.util.concurrent.EventExecutor;
|
||||||
import io.netty.util.concurrent.EventExecutorGroup;
|
import io.netty.util.concurrent.EventExecutorGroup;
|
||||||
|
import io.netty.util.internal.OneTimeTask;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
@ -739,18 +740,76 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
// Remove all handlers sequentially if channel is closed and unregistered.
|
// Remove all handlers sequentially if channel is closed and unregistered.
|
||||||
if (!channel.isOpen()) {
|
if (!channel.isOpen()) {
|
||||||
teardownAll();
|
destroy();
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes all handlers from the pipeline one by one from tail (exclusive) to head (inclusive) to trigger
|
* Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
|
||||||
* handlerRemoved(). Note that the tail handler is excluded because it's neither an outbound handler nor it
|
* handlerRemoved().
|
||||||
* does anything in handlerRemoved().
|
*
|
||||||
|
* Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext)})
|
||||||
|
* before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext)}) so that
|
||||||
|
* the handlers are removed after all events are handled.
|
||||||
|
*
|
||||||
|
* See: https://github.com/netty/netty/issues/3156
|
||||||
*/
|
*/
|
||||||
private void teardownAll() {
|
private void destroy() {
|
||||||
tail.prev.teardown();
|
destroyUp(head.next);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void destroyUp(AbstractChannelHandlerContext ctx) {
|
||||||
|
final Thread currentThread = Thread.currentThread();
|
||||||
|
final AbstractChannelHandlerContext tail = this.tail;
|
||||||
|
for (;;) {
|
||||||
|
if (ctx == tail) {
|
||||||
|
destroyDown(currentThread, tail.prev);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
final EventExecutor executor = ctx.executor();
|
||||||
|
if (!executor.inEventLoop(currentThread)) {
|
||||||
|
final AbstractChannelHandlerContext finalCtx = ctx;
|
||||||
|
executor.execute(new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
destroyUp(finalCtx);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx = ctx.next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx) {
|
||||||
|
// We have reached at tail; now traverse backwards.
|
||||||
|
final AbstractChannelHandlerContext head = this.head;
|
||||||
|
for (;;) {
|
||||||
|
if (ctx == head) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
final EventExecutor executor = ctx.executor();
|
||||||
|
if (executor.inEventLoop(currentThread)) {
|
||||||
|
synchronized (this) {
|
||||||
|
remove0(ctx);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
final AbstractChannelHandlerContext finalCtx = ctx;
|
||||||
|
executor.execute(new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
destroyDown(Thread.currentThread(), finalCtx);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx = ctx.prev;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user