More graceful registration failure

- Fixes #2060
- Ensure to return a future/promise implementation that does not fail with 'not registered to an event loop' error for registration operations
- If there is no usable event loop available, GlobalEventExecutor.INSTANCE is used as a fallback.
This commit is contained in:
Trustin Lee 2013-12-21 18:08:58 +09:00
parent 836ac02ab9
commit 89a7cb8e71
6 changed files with 155 additions and 20 deletions

View File

@ -23,9 +23,11 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.StringUtil;
import java.net.InetAddress;
@ -267,16 +269,23 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regPromise = initAndRegister();
final Channel channel = regPromise.channel();
final ChannelPromise promise = channel.newPromise();
if (regPromise.isDone()) {
doBind0(regPromise, channel, localAddress, promise);
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
regPromise.addListener(new ChannelFutureListener() {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(future, channel, localAddress, promise);
doBind0(regFuture, channel, localAddress, promise);
}
});
}
@ -293,9 +302,8 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
return channel.newFailedFuture(t);
}
ChannelPromise regPromise = channel.newPromise();
group().register(channel, regPromise);
if (regPromise.cause() != null) {
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
@ -312,7 +320,7 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regPromise;
return regFuture;
}
abstract void init(Channel channel) throws Exception;
@ -323,7 +331,6 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {

View File

@ -17,6 +17,8 @@ package io.netty.bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@ -228,7 +230,7 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel child = (Channel) msg;
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
@ -247,13 +249,22 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
}
try {
childGroup.register(child);
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
forceClose(child, future.cause());
}
});
} catch (Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: " + child, t);
forceClose(child, t);
}
}
private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: " + child, t);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();

View File

@ -45,7 +45,7 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
@Override
public ChannelFuture register(Channel channel) {
return register(channel, channel.newPromise());
return register(channel, new DefaultChannelPromise(channel, this));
}
@Override

View File

@ -256,9 +256,10 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
throw new NullPointerException("channel");
}
try {
return nextChild().register(channel);
EventLoop l = nextChild();
return l.register(channel, new DefaultChannelPromise(channel, l));
} catch (Throwable t) {
return channel.newFailedFuture(t);
return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
}
}

View File

@ -18,6 +18,7 @@ package io.netty.channel.embedded;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.AbstractEventExecutor;
@ -90,7 +91,7 @@ final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop
@Override
public ChannelFuture register(Channel channel) {
return register(channel, channel.newPromise());
return register(channel, new DefaultChannelPromise(channel, this));
}
@Override

View File

@ -0,0 +1,115 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.oio;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
import io.netty.util.NetUtil;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
public class OioEventLoopTest {
@Test
public void testTooManyServerChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.channel(OioServerSocketChannel.class);
b.group(g);
b.childHandler(new ChannelInboundHandlerAdapter());
ChannelFuture f1 = b.bind(0);
f1.sync();
ChannelFuture f2 = b.bind(0);
f2.await();
assertThat(f2.cause(), is(instanceOf(ChannelException.class)));
assertThat(f2.cause().getMessage().toLowerCase(), containsString("too many channels"));
final CountDownLatch notified = new CountDownLatch(1);
f2.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notified.countDown();
}
});
notified.await();
g.shutdownGracefully();
}
@Test
public void testTooManyClientChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap sb = new ServerBootstrap();
sb.channel(OioServerSocketChannel.class);
sb.group(g);
sb.childHandler(new ChannelInboundHandlerAdapter());
ChannelFuture f1 = sb.bind(0);
f1.sync();
Bootstrap cb = new Bootstrap();
cb.channel(OioSocketChannel.class);
cb.group(g);
cb.handler(new ChannelInboundHandlerAdapter());
ChannelFuture f2 = cb.connect(NetUtil.LOCALHOST, ((InetSocketAddress) f1.channel().localAddress()).getPort());
f2.await();
assertThat(f2.cause(), is(instanceOf(ChannelException.class)));
assertThat(f2.cause().getMessage().toLowerCase(), containsString("too many channels"));
final CountDownLatch notified = new CountDownLatch(1);
f2.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notified.countDown();
}
});
notified.await();
g.shutdownGracefully();
}
@Test
public void testTooManyAcceptedChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap sb = new ServerBootstrap();
sb.channel(OioServerSocketChannel.class);
sb.group(g);
sb.childHandler(new ChannelInboundHandlerAdapter());
ChannelFuture f1 = sb.bind(0);
f1.sync();
Socket s = new Socket(NetUtil.LOCALHOST, ((InetSocketAddress) f1.channel().localAddress()).getPort());
assertThat(s.getInputStream().read(), is(-1));
s.close();
g.shutdownGracefully();
}
}