From 4e05c52c2e7447f3c524eb928877dd941fea1929 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sat, 21 Dec 2013 18:08:58 +0900 Subject: [PATCH] More graceful registration failure - Fixes #2060 - Ensure to return a future/promise implementation that does not fail with 'not registered to an event loop' error for registration operations - If there is no usable event loop available, GlobalEventExecutor.INSTANCE is used as a fallback. - Add VoidChannel, which is used when an instantiation of a channel fails. --- .../io/netty/bootstrap/AbstractBootstrap.java | 41 +++-- .../java/io/netty/channel/VoidChannel.java | 172 ++++++++++++++++++ .../netty/channel/oio/OioEventLoopTest.java | 115 ++++++++++++ 3 files changed, 315 insertions(+), 13 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/VoidChannel.java create mode 100644 transport/src/test/java/io/netty/channel/oio/OioEventLoopTest.java diff --git a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java index f1615afe82..10fc05df27 100644 --- a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java @@ -22,9 +22,12 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; import io.netty.channel.EventLoopGroup; +import io.netty.channel.VoidChannel; import io.netty.util.AttributeKey; import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.internal.StringUtil; import java.net.InetAddress; @@ -228,16 +231,23 @@ public abstract class AbstractBootstrap, C ext } private ChannelFuture doBind(final SocketAddress localAddress) { - final ChannelFuture regPromise = initAndRegister(); - final Channel channel = regPromise.channel(); - final ChannelPromise promise = channel.newPromise(); - if (regPromise.isDone()) { - doBind0(regPromise, channel, localAddress, promise); + final ChannelFuture regFuture = initAndRegister(); + final Channel channel = regFuture.channel(); + if (regFuture.cause() != null) { + return regFuture; + } + + final ChannelPromise promise; + if (regFuture.isDone()) { + promise = channel.newPromise(); + doBind0(regFuture, channel, localAddress, promise); } else { - regPromise.addListener(new ChannelFutureListener() { + // Registration future is almost always fulfilled already, but just in case it's not. + promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE); + regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - doBind0(future, channel, localAddress, promise); + doBind0(regFuture, channel, localAddress, promise); } }); } @@ -248,7 +258,13 @@ public abstract class AbstractBootstrap, C ext abstract Channel createChannel(); final ChannelFuture initAndRegister() { - Channel channel = createChannel(); + Channel channel; + try { + channel = createChannel(); + } catch (Throwable t) { + return VoidChannel.INSTANCE.newFailedFuture(t); + } + try { init(channel); } catch (Throwable t) { @@ -256,9 +272,9 @@ public abstract class AbstractBootstrap, C ext return channel.newFailedFuture(t); } - ChannelPromise regPromise = channel.newPromise(); - channel.unsafe().register(regPromise); - if (regPromise.cause() != null) { + ChannelPromise regFuture = channel.newPromise(); + channel.unsafe().register(regFuture); + if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { @@ -275,7 +291,7 @@ public abstract class AbstractBootstrap, C ext // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. - return regPromise; + return regFuture; } abstract void init(Channel channel) throws Exception; @@ -286,7 +302,6 @@ public abstract class AbstractBootstrap, C ext // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. - channel.eventLoop().execute(new Runnable() { @Override public void run() { diff --git a/transport/src/main/java/io/netty/channel/VoidChannel.java b/transport/src/main/java/io/netty/channel/VoidChannel.java new file mode 100644 index 0000000000..fee1b343a1 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/VoidChannel.java @@ -0,0 +1,172 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.internal.StringUtil; + +import java.net.SocketAddress; +import java.util.concurrent.TimeUnit; + +/** + * A {@link Channel} that represents a non-existing {@link Channel} which could not be instantiated successfully. + */ +public final class VoidChannel extends AbstractChannel { + + public static final VoidChannel INSTANCE = new VoidChannel(); + + private VoidChannel() { + super(null, new AbstractEventLoop(null) { + private final ChannelHandlerInvoker invoker = + new DefaultChannelHandlerInvoker(GlobalEventExecutor.INSTANCE); + + @Override + @Deprecated + public void shutdown() { + GlobalEventExecutor.INSTANCE.shutdown(); + } + + @Override + public ChannelHandlerInvoker asInvoker() { + return invoker; + } + + @Override + public boolean inEventLoop(Thread thread) { + return GlobalEventExecutor.INSTANCE.inEventLoop(thread); + } + + @Override + public boolean isShuttingDown() { + return GlobalEventExecutor.INSTANCE.isShuttingDown(); + } + + @Override + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + return GlobalEventExecutor.INSTANCE.shutdownGracefully(quietPeriod, timeout, unit); + } + + @Override + public Future terminationFuture() { + return GlobalEventExecutor.INSTANCE.terminationFuture(); + } + + @Override + public boolean isShutdown() { + return GlobalEventExecutor.INSTANCE.isShutdown(); + } + + @Override + public boolean isTerminated() { + return GlobalEventExecutor.INSTANCE.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return GlobalEventExecutor.INSTANCE.awaitTermination(timeout, unit); + } + + @Override + public void execute(Runnable command) { + GlobalEventExecutor.INSTANCE.execute(command); + } + }); + } + + @Override + protected AbstractUnsafe newUnsafe() { + return new AbstractUnsafe() { + @Override + public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + reject(); + } + }; + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return true; + } + + @Override + protected SocketAddress localAddress0() { + return reject(); + } + + @Override + protected SocketAddress remoteAddress0() { + return reject(); + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + reject(); + } + + @Override + protected void doDisconnect() throws Exception { + reject(); + } + + @Override + protected void doClose() throws Exception { + reject(); + } + + @Override + protected void doBeginRead() throws Exception { + reject(); + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + reject(); + } + + @Override + public ChannelConfig config() { + return reject(); + } + + @Override + public boolean isOpen() { + return reject(); + } + + @Override + public boolean isActive() { + return reject(); + } + + @Override + public ChannelMetadata metadata() { + return reject(); + } + + @Override + public String toString() { + return StringUtil.simpleClassName(this); + } + + private static T reject() { + throw new UnsupportedOperationException( + StringUtil.simpleClassName(VoidChannel.class) + + " is only for the representation of a non-existing " + + StringUtil.simpleClassName(Channel.class) + '.'); + } +} diff --git a/transport/src/test/java/io/netty/channel/oio/OioEventLoopTest.java b/transport/src/test/java/io/netty/channel/oio/OioEventLoopTest.java new file mode 100644 index 0000000000..54e8201d27 --- /dev/null +++ b/transport/src/test/java/io/netty/channel/oio/OioEventLoopTest.java @@ -0,0 +1,115 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.oio; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.oio.OioServerSocketChannel; +import io.netty.channel.socket.oio.OioSocketChannel; +import io.netty.util.NetUtil; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +public class OioEventLoopTest { + @Test + public void testTooManyServerChannels() throws Exception { + EventLoopGroup g = new OioEventLoopGroup(1); + ServerBootstrap b = new ServerBootstrap(); + b.channel(OioServerSocketChannel.class); + b.group(g); + b.childHandler(new ChannelHandlerAdapter()); + ChannelFuture f1 = b.bind(0); + f1.sync(); + + ChannelFuture f2 = b.bind(0); + f2.await(); + + assertThat(f2.cause(), is(instanceOf(ChannelException.class))); + assertThat(f2.cause().getMessage().toLowerCase(), containsString("too many channels")); + + final CountDownLatch notified = new CountDownLatch(1); + f2.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + notified.countDown(); + } + }); + + notified.await(); + g.shutdownGracefully(); + } + + @Test + public void testTooManyClientChannels() throws Exception { + EventLoopGroup g = new OioEventLoopGroup(1); + ServerBootstrap sb = new ServerBootstrap(); + sb.channel(OioServerSocketChannel.class); + sb.group(g); + sb.childHandler(new ChannelHandlerAdapter()); + ChannelFuture f1 = sb.bind(0); + f1.sync(); + + Bootstrap cb = new Bootstrap(); + cb.channel(OioSocketChannel.class); + cb.group(g); + cb.handler(new ChannelHandlerAdapter()); + ChannelFuture f2 = cb.connect(NetUtil.LOCALHOST, ((InetSocketAddress) f1.channel().localAddress()).getPort()); + f2.await(); + + assertThat(f2.cause(), is(instanceOf(ChannelException.class))); + assertThat(f2.cause().getMessage().toLowerCase(), containsString("too many channels")); + + final CountDownLatch notified = new CountDownLatch(1); + f2.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + notified.countDown(); + } + }); + + notified.await(); + g.shutdownGracefully(); + } + + @Test + public void testTooManyAcceptedChannels() throws Exception { + EventLoopGroup g = new OioEventLoopGroup(1); + ServerBootstrap sb = new ServerBootstrap(); + sb.channel(OioServerSocketChannel.class); + sb.group(g); + sb.childHandler(new ChannelHandlerAdapter()); + ChannelFuture f1 = sb.bind(0); + f1.sync(); + + Socket s = new Socket(NetUtil.LOCALHOST, ((InetSocketAddress) f1.channel().localAddress()).getPort()); + assertThat(s.getInputStream().read(), is(-1)); + s.close(); + + g.shutdownGracefully(); + } +}