Allow to delay registration when creating a EmbeddedChannel

Motivation:

Some ChannelOptions must be set before the Channel is really registered to have the desired effect.

Modifications:

Add another constructor argument which allows to not register the EmbeddedChannel to its EventLoop until the user calls register().

Result:

More flexible usage of EmbeddedChannel. Also Fixes [#6968].
This commit is contained in:
Norman Maurer 2017-07-18 16:14:30 +02:00
parent 08748344d8
commit ef22e65b57
3 changed files with 93 additions and 24 deletions

View File

@ -160,27 +160,32 @@ public class SslHandlerTest {
}
public void test(final boolean dropChannelActive) throws Exception {
SSLEngine engine = SSLContext.getDefault().createSSLEngine();
engine.setUseClientMode(true);
SSLEngine engine = SSLContext.getDefault().createSSLEngine();
engine.setUseClientMode(true);
EmbeddedChannel ch = new EmbeddedChannel(
this,
new SslHandler(engine),
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (!dropChannelActive) {
ctx.fireChannelActive();
}
}
}
);
ch.config().setAutoRead(false);
assertFalse(ch.config().isAutoRead());
EmbeddedChannel ch = new EmbeddedChannel(false, false,
this,
new SslHandler(engine),
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (!dropChannelActive) {
ctx.fireChannelActive();
}
}
}
);
ch.config().setAutoRead(false);
assertFalse(ch.config().isAutoRead());
assertTrue(ch.writeOutbound(Unpooled.EMPTY_BUFFER));
assertTrue(readIssued);
assertTrue(ch.finishAndReleaseAll());
ch.register();
assertTrue(readIssued);
readIssued = false;
assertTrue(ch.writeOutbound(Unpooled.EMPTY_BUFFER));
assertTrue(readIssued);
assertTrue(ch.finishAndReleaseAll());
}
}

View File

@ -110,6 +110,19 @@ public class EmbeddedChannel extends AbstractChannel {
this(EmbeddedChannelId.INSTANCE, hasDisconnect, handlers);
}
/**
* Create a new instance with the pipeline initialized with the specified handlers.
*
* @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
* constructor. If {@code false} the user will need to call {@link #register()}.
* @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
* to {@link #close()}, {@link false} otherwise.
* @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
*/
public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
this(EmbeddedChannelId.INSTANCE, register, hasDisconnect, handlers);
}
/**
* Create a new instance with the channel ID set to the given ID and the pipeline
* initialized with the specified handlers.
@ -117,7 +130,7 @@ public class EmbeddedChannel extends AbstractChannel {
* @param channelId the {@link ChannelId} that will be used to identify this channel
* @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
*/
public EmbeddedChannel(ChannelId channelId, final ChannelHandler... handlers) {
public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
this(channelId, false, handlers);
}
@ -130,11 +143,27 @@ public class EmbeddedChannel extends AbstractChannel {
* to {@link #close()}, {@link false} otherwise.
* @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
*/
public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelHandler... handlers) {
public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
this(channelId, true, hasDisconnect, handlers);
}
/**
* Create a new instance with the channel ID set to the given ID and the pipeline
* initialized with the specified handlers.
*
* @param channelId the {@link ChannelId} that will be used to identify this channel
* @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
* constructor. If {@code false} the user will need to call {@link #register()}.
* @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
* to {@link #close()}, {@link false} otherwise.
* @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
*/
public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
final ChannelHandler... handlers) {
super(null, channelId);
metadata = metadata(hasDisconnect);
config = new DefaultChannelConfig(this);
setup(handlers);
setup(register, handlers);
}
/**
@ -152,14 +181,14 @@ public class EmbeddedChannel extends AbstractChannel {
super(null, channelId);
metadata = metadata(hasDisconnect);
this.config = ObjectUtil.checkNotNull(config, "config");
setup(handlers);
setup(true, handlers);
}
private static ChannelMetadata metadata(boolean hasDisconnect) {
return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
}
private void setup(final ChannelHandler... handlers) {
private void setup(boolean register, final ChannelHandler... handlers) {
ObjectUtil.checkNotNull(handlers, "handlers");
ChannelPipeline p = pipeline();
p.addLast(new ChannelInitializer<Channel>() {
@ -174,9 +203,22 @@ public class EmbeddedChannel extends AbstractChannel {
}
}
});
if (register) {
ChannelFuture future = loop.register(this);
assert future.isDone();
}
}
/**
* Register this {@code Channel} on its {@link EventLoop}.
*/
public void register() throws Exception {
ChannelFuture future = loop.register(this);
assert future.isDone();
Throwable cause = future.cause();
if (cause != null) {
PlatformDependent.throwException(cause);
}
}
@Override

View File

@ -53,6 +53,28 @@ import io.netty.util.concurrent.ScheduledFuture;
public class EmbeddedChannelTest {
@Test
public void testNotRegistered() throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(false, false);
assertFalse(channel.isRegistered());
channel.register();
assertTrue(channel.isRegistered());
assertFalse(channel.finish());
}
@Test
public void testRegistered() throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(true, false);
assertTrue(channel.isRegistered());
try {
channel.register();
fail();
} catch (IllegalStateException expected) {
// This is expected the channel is registered already on an EventLoop.
}
assertFalse(channel.finish());
}
@Test(timeout = 2000)
public void promiseDoesNotInfiniteLoop() throws InterruptedException {
EmbeddedChannel channel = new EmbeddedChannel();