From c3034c896489f76b95ae4517b8f3d86e75e46215 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 11 Jun 2013 18:46:39 +0900 Subject: [PATCH] Implement the cancellation of connection attmpe for NIO and OIO transport - Related issue: #1432 - Also added test cases to validate the implementation --- .../netty/util/concurrent/DefaultPromise.java | 2 + .../socket/SocketConnectionAttemptTest.java | 77 +++++++++++++++++++ .../netty/channel/nio/AbstractNioChannel.java | 21 ++++- .../netty/channel/oio/AbstractOioChannel.java | 7 +- 4 files changed, 105 insertions(+), 2 deletions(-) create mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index baa51c5868..9bf86d41c0 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -428,6 +428,8 @@ public class DefaultPromise extends AbstractFuture implements Promise { notifyAll(); } } + + notifyListeners(); return true; } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java new file mode 100644 index 0000000000..8f98a20de4 --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOption; +import io.netty.util.internal.SystemPropertyUtil; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +public class SocketConnectionAttemptTest extends AbstractClientSocketTest { + + private static final String BAD_HOST = SystemPropertyUtil.get("io.netty.testsuite.badHost", "255.255.255.0"); + + @Test(timeout = 30000) + public void testConnectTimeout() throws Throwable { + run(); + } + + public void testConnectTimeout(Bootstrap cb) throws Throwable { + TestHandler h = new TestHandler(); + cb.handler(h).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000); + ChannelFuture future = cb.connect(BAD_HOST, 8080); + try { + assertThat(future.await(3000), is(true)); + } finally { + future.channel().close(); + } + } + + @Test + public void testConnectCancellation() throws Throwable { + run(); + } + + public void testConnectCancellation(Bootstrap cb) throws Throwable { + TestHandler h = new TestHandler(); + cb.handler(h).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000); + ChannelFuture future = cb.connect(BAD_HOST, 8080); + try { + assertThat(future.await(1000), is(false)); + if (future.cancel(true)) { + assertThat(future.channel().closeFuture().await(500), is(true)); + assertThat(future.isCancelled(), is(true)); + } else { + // Cancellation not supported by the transport. + } + } finally { + future.channel().close(); + } + } + + private static class TestHandler extends ChannelInboundHandlerAdapter { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index cf8f0896b5..ca0204a846 100755 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -18,6 +18,8 @@ package io.netty.channel.nio; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPromise; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoop; @@ -191,6 +193,19 @@ public abstract class AbstractNioChannel extends AbstractChannel { } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } + + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isCancelled()) { + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + close(voidPromise()); + } + } + }); } } catch (Throwable t) { if (t instanceof ConnectException) { @@ -198,15 +213,19 @@ public abstract class AbstractNioChannel extends AbstractChannel { newT.setStackTrace(t.getStackTrace()); t = newT; } - promise.setFailure(t); closeIfClosed(); + promise.tryFailure(t); } } @Override public void finishConnect() { + // Note this method is invoked by the event loop only if the connection attempt was + // neither cancelled nor timed out. + assert eventLoop().inEventLoop(); assert connectPromise != null; + try { boolean wasActive = isActive(); doFinishConnect(); diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java index 05d78a7f1b..8d36f2b295 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java @@ -62,6 +62,11 @@ public abstract class AbstractOioChannel extends AbstractChannel { return; } + if (!promise.setUncancellable()) { + close(voidPromise()); + return; + } + try { boolean wasActive = isActive(); doConnect(remoteAddress, localAddress); @@ -75,8 +80,8 @@ public abstract class AbstractOioChannel extends AbstractChannel { newT.setStackTrace(t.getStackTrace()); t = newT; } - promise.setFailure(t); closeIfClosed(); + promise.setFailure(t); } } }