[#3921] EmbeddedChannel should add ChannelHandlers once registered
Motivation: Currently in EmbeddedChannel we add the ChannelHandlers before the Channel is registered which leads to have the handlerAdded(...) callback be called from outside the EventLoop and also prevent the user to obtain a reference to the EventLoop in the callback itself. Modifications: Delay adding ChannelHandlers until EmbeddedChannel is registered. Result: Correctly call handlerAdded(...) after EmbeddedChannel is registered.
This commit is contained in:
parent
5804cb3e1c
commit
287ac6d328
@ -22,6 +22,7 @@ import io.netty.channel.ChannelFuture;
|
|||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
|
import io.netty.channel.ChannelInitializer;
|
||||||
import io.netty.channel.ChannelMetadata;
|
import io.netty.channel.ChannelMetadata;
|
||||||
import io.netty.channel.ChannelOutboundBuffer;
|
import io.netty.channel.ChannelOutboundBuffer;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
@ -62,7 +63,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
*
|
*
|
||||||
* @param handlers the @link ChannelHandler}s which will be add in the {@link ChannelPipeline}
|
* @param handlers the @link ChannelHandler}s which will be add in the {@link ChannelPipeline}
|
||||||
*/
|
*/
|
||||||
public EmbeddedChannel(ChannelHandler... handlers) {
|
public EmbeddedChannel(final ChannelHandler... handlers) {
|
||||||
super(null);
|
super(null);
|
||||||
|
|
||||||
if (handlers == null) {
|
if (handlers == null) {
|
||||||
@ -70,20 +71,33 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int nHandlers = 0;
|
int nHandlers = 0;
|
||||||
ChannelPipeline p = pipeline();
|
|
||||||
for (ChannelHandler h: handlers) {
|
for (ChannelHandler h: handlers) {
|
||||||
if (h == null) {
|
if (h == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
nHandlers ++;
|
nHandlers ++;
|
||||||
p.addLast(h);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ChannelPipeline p = pipeline();
|
||||||
|
p.addLast(new ChannelInitializer<Channel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
|
ChannelPipeline pipeline = ch.pipeline();
|
||||||
|
for (ChannelHandler h: handlers) {
|
||||||
|
if (h == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pipeline.addLast(h);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
if (nHandlers == 0) {
|
if (nHandlers == 0) {
|
||||||
throw new IllegalArgumentException("handlers is empty.");
|
throw new IllegalArgumentException("handlers is empty.");
|
||||||
}
|
}
|
||||||
|
|
||||||
loop.register(this);
|
ChannelFuture future = loop.register(this);
|
||||||
|
assert future.isDone();
|
||||||
p.addLast(new LastInboundHandler());
|
p.addLast(new LastInboundHandler());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ package io.netty.channel.embedded;
|
|||||||
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
|
import io.netty.channel.ChannelHandlerAdapter;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.ChannelInitializer;
|
||||||
@ -29,6 +30,7 @@ import org.junit.Test;
|
|||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
public class EmbeddedChannelTest {
|
public class EmbeddedChannelTest {
|
||||||
|
|
||||||
@ -95,4 +97,29 @@ public class EmbeddedChannelTest {
|
|||||||
ch.finish();
|
ch.finish();
|
||||||
Assert.assertTrue(future.isCancelled());
|
Assert.assertTrue(future.isCancelled());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 3000)
|
||||||
|
public void testHandlerAddedExecutedInEventLoop() throws Throwable {
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
|
||||||
|
final ChannelHandler handler = new ChannelHandlerAdapter() {
|
||||||
|
@Override
|
||||||
|
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
try {
|
||||||
|
Assert.assertTrue(ctx.executor().inEventLoop());
|
||||||
|
} catch (Throwable cause) {
|
||||||
|
error.set(cause);
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
EmbeddedChannel channel = new EmbeddedChannel(handler);
|
||||||
|
Assert.assertFalse(channel.finish());
|
||||||
|
latch.await();
|
||||||
|
Throwable cause = error.get();
|
||||||
|
if (cause != null) {
|
||||||
|
throw cause;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user