Correctly protect DefaultChannelPipeline nodes when concurrent removals happen due handlerAdded(...) throwing (#9530)

Motivation:

We need to update the doubly-linked list nodes while holding a lock via synchronized in all cases as otherwise we may end-up with a corrupted pipeline. We missed this when calling remove0(...) due handlerAdded(...) throwing an exception.

Modifications:

- Correctly hold lock while update node
- Add assert
- Add unit test

Result:

Fixes https://github.com/netty/netty/issues/9528
This commit is contained in:
Norman Maurer 2019-09-03 08:35:11 +02:00 committed by GitHub
parent 1039f69e53
commit affbdf7125
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 76 additions and 4 deletions

View File

@ -482,7 +482,8 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return ctx;
}
private static void remove0(AbstractChannelHandlerContext ctx) {
private void remove0(AbstractChannelHandlerContext ctx) {
assert Thread.holdsLock(this);
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
@ -611,7 +612,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} catch (Throwable t) {
boolean removed = false;
try {
remove0(ctx);
synchronized (this) {
remove0(ctx);
}
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
@ -1481,7 +1484,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
remove0(ctx);
synchronized (this) {
remove0(ctx);
}
ctx.setRemoved();
}
}

View File

@ -43,6 +43,7 @@ import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.net.SocketAddress;
@ -73,11 +74,16 @@ import static org.junit.Assert.fail;
public class DefaultChannelPipelineTest {
private static final EventLoopGroup group = new DefaultEventLoopGroup(1);
private static EventLoopGroup group;
private Channel self;
private Channel peer;
@BeforeClass
public static void beforeClass() throws Exception {
group = new DefaultEventLoopGroup(1);
}
@AfterClass
public static void afterClass() throws Exception {
group.shutdownGracefully().sync();
@ -1637,6 +1643,67 @@ public class DefaultChannelPipelineTest {
channel2.close().syncUninterruptibly();
}
@Test(timeout = 5000)
public void testHandlerAddedFailedButHandlerStillRemoved() throws InterruptedException {
testHandlerAddedFailedButHandlerStillRemoved0(false);
}
@Test(timeout = 5000)
public void testHandlerAddedFailedButHandlerStillRemovedWithLaterRegister() throws InterruptedException {
testHandlerAddedFailedButHandlerStillRemoved0(true);
}
private static void testHandlerAddedFailedButHandlerStillRemoved0(boolean lateRegister)
throws InterruptedException {
EventExecutorGroup executorGroup = new DefaultEventExecutorGroup(16);
final int numHandlers = 32;
try {
Channel channel = new LocalChannel();
channel.config().setOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, false);
if (!lateRegister) {
group.register(channel).sync();
}
channel.pipeline().addFirst(newHandler());
List<CountDownLatch> latchList = new ArrayList<CountDownLatch>(numHandlers);
for (int i = 0; i < numHandlers; i++) {
CountDownLatch latch = new CountDownLatch(1);
channel.pipeline().addFirst(executorGroup, "h" + i, new BadChannelHandler(latch));
latchList.add(latch);
}
if (lateRegister) {
group.register(channel).sync();
}
for (int i = 0; i < numHandlers; i++) {
// Wait until the latch was countDown which means handlerRemoved(...) was called.
latchList.get(i).await();
assertNull(channel.pipeline().get("h" + i));
}
} finally {
executorGroup.shutdownGracefully();
}
}
private static final class BadChannelHandler extends ChannelHandlerAdapter {
private final CountDownLatch latch;
BadChannelHandler(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
TimeUnit.MILLISECONDS.sleep(10);
throw new RuntimeException();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
latch.countDown();
}
}
@Test(timeout = 5000)
public void handlerAddedStateUpdatedBeforeHandlerAddedDoneForceEventLoop() throws InterruptedException {
handlerAddedStateUpdatedBeforeHandlerAddedDone(true);