Only call handlerRemoved(...) if handlerAdded(...) was called during adding the handler to the pipeline. (#8684)
Motivation: Due a race in DefaultChannelPipeline / AbstractChannelHandlerContext it was possible to have only handlerRemoved(...) called during tearing down the pipeline, even when handlerAdded(...) was never called. We need to ensure we either call both of none to guarantee a proper lifecycle of the handler. Modifications: - Enforce handlerAdded(...) / handlerRemoved(...) semantics / ordering - Add unit test. Result: Fixes https://github.com/netty/netty/issues/8676 / https://github.com/netty/netty/issues/6536 .
This commit is contained in:
parent
6ed296b719
commit
d0891d08d7
@ -962,14 +962,17 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
|||||||
handlerState = REMOVE_COMPLETE;
|
handlerState = REMOVE_COMPLETE;
|
||||||
}
|
}
|
||||||
|
|
||||||
final void setAddComplete() {
|
final boolean setAddComplete() {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
int oldState = handlerState;
|
int oldState = handlerState;
|
||||||
|
if (oldState == REMOVE_COMPLETE) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
|
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
|
||||||
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
|
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
|
||||||
// exposing ordering guarantees.
|
// exposing ordering guarantees.
|
||||||
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
|
if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -979,6 +982,26 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
|||||||
assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
|
assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final void callHandlerAdded() throws Exception {
|
||||||
|
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
|
||||||
|
// any pipeline events ctx.handler() will miss them because the state will not allow it.
|
||||||
|
if (setAddComplete()) {
|
||||||
|
handler().handlerAdded(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final void callHandlerRemoved() throws Exception {
|
||||||
|
try {
|
||||||
|
// Only call handlerRemoved(...) if we called handlerAdded(...) before.
|
||||||
|
if (handlerState == ADD_COMPLETE) {
|
||||||
|
handler().handlerRemoved(this);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
// Mark the handler as removed in any case.
|
||||||
|
setRemoved();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
|
* Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
|
||||||
* yet. If not return {@code false} and if called or could not detect return {@code true}.
|
* yet. If not return {@code false} and if called or could not detect return {@code true}.
|
||||||
|
@ -631,19 +631,12 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
|
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
|
||||||
try {
|
try {
|
||||||
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
|
ctx.callHandlerAdded();
|
||||||
// any pipeline events ctx.handler() will miss them because the state will not allow it.
|
|
||||||
ctx.setAddComplete();
|
|
||||||
ctx.handler().handlerAdded(ctx);
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
boolean removed = false;
|
boolean removed = false;
|
||||||
try {
|
try {
|
||||||
remove0(ctx);
|
remove0(ctx);
|
||||||
try {
|
ctx.callHandlerRemoved();
|
||||||
ctx.handler().handlerRemoved(ctx);
|
|
||||||
} finally {
|
|
||||||
ctx.setRemoved();
|
|
||||||
}
|
|
||||||
removed = true;
|
removed = true;
|
||||||
} catch (Throwable t2) {
|
} catch (Throwable t2) {
|
||||||
if (logger.isWarnEnabled()) {
|
if (logger.isWarnEnabled()) {
|
||||||
@ -666,11 +659,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
|
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
|
||||||
// Notify the complete removal.
|
// Notify the complete removal.
|
||||||
try {
|
try {
|
||||||
try {
|
ctx.callHandlerRemoved();
|
||||||
ctx.handler().handlerRemoved(ctx);
|
|
||||||
} finally {
|
|
||||||
ctx.setRemoved();
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
fireExceptionCaught(new ChannelPipelineException(
|
fireExceptionCaught(new ChannelPipelineException(
|
||||||
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
|
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
|
||||||
|
@ -24,6 +24,7 @@ import io.netty.channel.ChannelHandler.Sharable;
|
|||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import io.netty.channel.local.LocalAddress;
|
import io.netty.channel.local.LocalAddress;
|
||||||
import io.netty.channel.local.LocalChannel;
|
import io.netty.channel.local.LocalChannel;
|
||||||
|
import io.netty.channel.local.LocalEventLoopGroup;
|
||||||
import io.netty.channel.local.LocalServerChannel;
|
import io.netty.channel.local.LocalServerChannel;
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
@ -1149,6 +1150,72 @@ public class DefaultChannelPipelineTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test for https://github.com/netty/netty/issues/8676.
|
||||||
|
@Test
|
||||||
|
public void testHandlerRemovedOnlyCalledWhenHandlerAddedCalled() throws Exception {
|
||||||
|
EventLoopGroup group = new DefaultEventLoopGroup(1);
|
||||||
|
try {
|
||||||
|
final AtomicReference<Error> errorRef = new AtomicReference<Error>();
|
||||||
|
|
||||||
|
// As this only happens via a race we will verify 500 times. This was good enough to have it failed most of
|
||||||
|
// the time.
|
||||||
|
for (int i = 0; i < 500; i++) {
|
||||||
|
|
||||||
|
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||||
|
group.register(pipeline.channel()).sync();
|
||||||
|
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
pipeline.addLast(new ChannelInboundHandlerAdapter() {
|
||||||
|
@Override
|
||||||
|
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
// Block just for a bit so we have a chance to trigger the race mentioned in the issue.
|
||||||
|
latch.await(50, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Close the pipeline which will call destroy0(). This will remove each handler in the pipeline and
|
||||||
|
// should call handlerRemoved(...) if and only if handlerAdded(...) was called for the handler before.
|
||||||
|
pipeline.close();
|
||||||
|
|
||||||
|
pipeline.addLast(new ChannelInboundHandlerAdapter() {
|
||||||
|
private boolean handerAddedCalled;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handlerAdded(ChannelHandlerContext ctx) {
|
||||||
|
handerAddedCalled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handlerRemoved(ChannelHandlerContext ctx) {
|
||||||
|
if (!handerAddedCalled) {
|
||||||
|
errorRef.set(new AssertionError(
|
||||||
|
"handlerRemoved(...) called without handlerAdded(...) before"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
latch.countDown();
|
||||||
|
|
||||||
|
pipeline.channel().closeFuture().syncUninterruptibly();
|
||||||
|
|
||||||
|
// Schedule something on the EventLoop to ensure all other scheduled tasks had a chance to complete.
|
||||||
|
pipeline.channel().eventLoop().submit(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
// NOOP
|
||||||
|
}
|
||||||
|
}).syncUninterruptibly();
|
||||||
|
Error error = errorRef.get();
|
||||||
|
if (error != null) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
group.shutdownGracefully();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 5000)
|
@Test(timeout = 5000)
|
||||||
public void handlerAddedStateUpdatedBeforeHandlerAddedDoneForceEventLoop() throws InterruptedException {
|
public void handlerAddedStateUpdatedBeforeHandlerAddedDoneForceEventLoop() throws InterruptedException {
|
||||||
handlerAddedStateUpdatedBeforeHandlerAddedDone(true);
|
handlerAddedStateUpdatedBeforeHandlerAddedDone(true);
|
||||||
|
Loading…
Reference in New Issue
Block a user