DefaultChannelPipeline will not invoke handler if events are fired from handlerAdded

Motiviation:
DefaultChannelPipeline and AbstractChannelHandlerContext maintain state
which indicates if a ChannelHandler should be invoked or not. However
the state is updated to allow the handler to be invoked only after the
handlerAdded method completes. If the handlerAdded method generates
events which may result in other methods being invoked on that handler
they will be missed.

Modifications:
- DefaultChannelPipeline should set the state before calling
handlerAdded

Result:
DefaultChannelPipeline will allow events to be processed during the
handlerAdded process.
This commit is contained in:
Scott Mitchell 2018-01-22 11:09:17 -08:00 committed by Norman Maurer
parent 27ff15319c
commit 2d815fa752
2 changed files with 68 additions and 2 deletions

View File

@ -611,8 +611,10 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try { try {
ctx.handler().handlerAdded(ctx); // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
ctx.setAddComplete(); ctx.setAddComplete();
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) { } catch (Throwable t) {
boolean removed = false; boolean removed = false;
try { try {

View File

@ -59,7 +59,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class DefaultChannelPipelineTest { public class DefaultChannelPipelineTest {
@ -1102,6 +1109,63 @@ public class DefaultChannelPipelineTest {
} }
} }
@Test(timeout = 5000)
public void handlerAddedStateUpdatedBeforeHandlerAddedDoneForceEventLoop() throws InterruptedException {
handlerAddedStateUpdatedBeforeHandlerAddedDone(true);
}
@Test(timeout = 5000)
public void handlerAddedStateUpdatedBeforeHandlerAddedDoneOnCallingThread() throws InterruptedException {
handlerAddedStateUpdatedBeforeHandlerAddedDone(false);
}
private static void handlerAddedStateUpdatedBeforeHandlerAddedDone(boolean executeInEventLoop)
throws InterruptedException {
final ChannelPipeline pipeline = new LocalChannel().pipeline();
final Object userEvent = new Object();
final Object writeObject = new Object();
final CountDownLatch doneLatch = new CountDownLatch(1);
group.register(pipeline.channel());
Runnable r = new Runnable() {
@Override
public void run() {
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt == userEvent) {
ctx.write(writeObject);
}
ctx.fireUserEventTriggered(evt);
}
});
pipeline.addFirst(new ChannelDuplexHandler() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
ctx.fireUserEventTriggered(userEvent);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg == writeObject) {
doneLatch.countDown();
}
ctx.write(msg, promise);
}
});
}
};
if (executeInEventLoop) {
pipeline.channel().eventLoop().execute(r);
} else {
r.run();
}
doneLatch.await();
}
private static final class TestTask implements Runnable { private static final class TestTask implements Runnable {
private final ChannelPipeline pipeline; private final ChannelPipeline pipeline;