More graceful registration failure

- Fixes #2060
- Ensure to return a future/promise implementation that does not fail with 'not registered to an event loop' error for registration operations
- If there is no usable event loop available, GlobalEventExecutor.INSTANCE is used as a fallback.
- Add VoidChannel, which is used when an instantiation of a channel fails.
This commit is contained in:
Trustin Lee 2013-12-21 18:08:58 +09:00
parent e1c47632e8
commit 4e05c52c2e
3 changed files with 315 additions and 13 deletions

View File

@ -22,9 +22,12 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.VoidChannel;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.StringUtil;
import java.net.InetAddress;
@ -228,16 +231,23 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regPromise = initAndRegister();
final Channel channel = regPromise.channel();
final ChannelPromise promise = channel.newPromise();
if (regPromise.isDone()) {
doBind0(regPromise, channel, localAddress, promise);
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
regPromise.addListener(new ChannelFutureListener() {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(future, channel, localAddress, promise);
doBind0(regFuture, channel, localAddress, promise);
}
});
}
@ -248,7 +258,13 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
abstract Channel createChannel();
final ChannelFuture initAndRegister() {
Channel channel = createChannel();
Channel channel;
try {
channel = createChannel();
} catch (Throwable t) {
return VoidChannel.INSTANCE.newFailedFuture(t);
}
try {
init(channel);
} catch (Throwable t) {
@ -256,9 +272,9 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
return channel.newFailedFuture(t);
}
ChannelPromise regPromise = channel.newPromise();
channel.unsafe().register(regPromise);
if (regPromise.cause() != null) {
ChannelPromise regFuture = channel.newPromise();
channel.unsafe().register(regFuture);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
@ -275,7 +291,7 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regPromise;
return regFuture;
}
abstract void init(Channel channel) throws Exception;
@ -286,7 +302,6 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {

View File

@ -0,0 +1,172 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
/**
* A {@link Channel} that represents a non-existing {@link Channel} which could not be instantiated successfully.
*/
public final class VoidChannel extends AbstractChannel {
public static final VoidChannel INSTANCE = new VoidChannel();
private VoidChannel() {
super(null, new AbstractEventLoop(null) {
private final ChannelHandlerInvoker invoker =
new DefaultChannelHandlerInvoker(GlobalEventExecutor.INSTANCE);
@Override
@Deprecated
public void shutdown() {
GlobalEventExecutor.INSTANCE.shutdown();
}
@Override
public ChannelHandlerInvoker asInvoker() {
return invoker;
}
@Override
public boolean inEventLoop(Thread thread) {
return GlobalEventExecutor.INSTANCE.inEventLoop(thread);
}
@Override
public boolean isShuttingDown() {
return GlobalEventExecutor.INSTANCE.isShuttingDown();
}
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
return GlobalEventExecutor.INSTANCE.shutdownGracefully(quietPeriod, timeout, unit);
}
@Override
public Future<?> terminationFuture() {
return GlobalEventExecutor.INSTANCE.terminationFuture();
}
@Override
public boolean isShutdown() {
return GlobalEventExecutor.INSTANCE.isShutdown();
}
@Override
public boolean isTerminated() {
return GlobalEventExecutor.INSTANCE.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return GlobalEventExecutor.INSTANCE.awaitTermination(timeout, unit);
}
@Override
public void execute(Runnable command) {
GlobalEventExecutor.INSTANCE.execute(command);
}
});
}
@Override
protected AbstractUnsafe newUnsafe() {
return new AbstractUnsafe() {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
reject();
}
};
}
@Override
protected boolean isCompatible(EventLoop loop) {
return true;
}
@Override
protected SocketAddress localAddress0() {
return reject();
}
@Override
protected SocketAddress remoteAddress0() {
return reject();
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
reject();
}
@Override
protected void doDisconnect() throws Exception {
reject();
}
@Override
protected void doClose() throws Exception {
reject();
}
@Override
protected void doBeginRead() throws Exception {
reject();
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
reject();
}
@Override
public ChannelConfig config() {
return reject();
}
@Override
public boolean isOpen() {
return reject();
}
@Override
public boolean isActive() {
return reject();
}
@Override
public ChannelMetadata metadata() {
return reject();
}
@Override
public String toString() {
return StringUtil.simpleClassName(this);
}
private static <T> T reject() {
throw new UnsupportedOperationException(
StringUtil.simpleClassName(VoidChannel.class) +
" is only for the representation of a non-existing " +
StringUtil.simpleClassName(Channel.class) + '.');
}
}

View File

@ -0,0 +1,115 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.oio;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
import io.netty.util.NetUtil;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
public class OioEventLoopTest {
@Test
public void testTooManyServerChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.channel(OioServerSocketChannel.class);
b.group(g);
b.childHandler(new ChannelHandlerAdapter());
ChannelFuture f1 = b.bind(0);
f1.sync();
ChannelFuture f2 = b.bind(0);
f2.await();
assertThat(f2.cause(), is(instanceOf(ChannelException.class)));
assertThat(f2.cause().getMessage().toLowerCase(), containsString("too many channels"));
final CountDownLatch notified = new CountDownLatch(1);
f2.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notified.countDown();
}
});
notified.await();
g.shutdownGracefully();
}
@Test
public void testTooManyClientChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap sb = new ServerBootstrap();
sb.channel(OioServerSocketChannel.class);
sb.group(g);
sb.childHandler(new ChannelHandlerAdapter());
ChannelFuture f1 = sb.bind(0);
f1.sync();
Bootstrap cb = new Bootstrap();
cb.channel(OioSocketChannel.class);
cb.group(g);
cb.handler(new ChannelHandlerAdapter());
ChannelFuture f2 = cb.connect(NetUtil.LOCALHOST, ((InetSocketAddress) f1.channel().localAddress()).getPort());
f2.await();
assertThat(f2.cause(), is(instanceOf(ChannelException.class)));
assertThat(f2.cause().getMessage().toLowerCase(), containsString("too many channels"));
final CountDownLatch notified = new CountDownLatch(1);
f2.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notified.countDown();
}
});
notified.await();
g.shutdownGracefully();
}
@Test
public void testTooManyAcceptedChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap sb = new ServerBootstrap();
sb.channel(OioServerSocketChannel.class);
sb.group(g);
sb.childHandler(new ChannelHandlerAdapter());
ChannelFuture f1 = sb.bind(0);
f1.sync();
Socket s = new Socket(NetUtil.LOCALHOST, ((InetSocketAddress) f1.channel().localAddress()).getPort());
assertThat(s.getInputStream().read(), is(-1));
s.close();
g.shutdownGracefully();
}
}