From b4610acda15f86b5027d44be5f72aad674f6ab71 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 11 May 2012 20:44:00 +0900 Subject: [PATCH] Implement connect timeout - Merged ClientChannelConfig back to ChannelConfig - AbstractChannel handles connect timeout making use of EventLoop.schedule() --- .../io/netty/channel/AbstractChannel.java | 32 ++++++++++++- .../java/io/netty/channel/ChannelConfig.java | 35 ++++++++++++-- .../io/netty/channel/ClientChannelConfig.java | 6 --- .../netty/channel/DefaultChannelConfig.java | 27 ++++++++++- .../channel/DefaultClientChannelConfig.java | 36 -------------- .../channel/socket/nio/SelectorEventLoop.java | 47 ------------------- 6 files changed, 88 insertions(+), 95 deletions(-) delete mode 100644 transport/src/main/java/io/netty/channel/ClientChannelConfig.java delete mode 100644 transport/src/main/java/io/netty/channel/DefaultClientChannelConfig.java diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 3907b2db68..f6c78b6f7e 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -21,11 +21,14 @@ import io.netty.util.DefaultAttributeMap; import io.netty.util.internal.ConcurrentHashMap; import java.io.IOException; +import java.net.ConnectException; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * A skeletal {@link Channel} implementation. @@ -73,6 +76,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private final ChannelPipeline pipeline = new DefaultChannelPipeline(this); private final List closureListeners = new ArrayList(4); private final ChannelFuture succeededFuture = new SucceededChannelFuture(this); + private final ChannelFuture voidFuture = new VoidChannelFuture(AbstractChannel.this); private volatile EventLoop eventLoop; private volatile boolean registered; @@ -83,6 +87,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha * connection attempts will fail. */ private ChannelFuture connectFuture; + private ScheduledFuture connectTimeoutFuture; + private ConnectException connectTimeoutException; + private long writtenAmount; /** Cache for the string representation of this channel */ @@ -385,8 +392,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private class DefaultUnsafe implements Unsafe { - private final ChannelFuture voidFuture = new VoidChannelFuture(AbstractChannel.this); - @Override public java.nio.channels.Channel ch() { return javaChannel(); @@ -498,6 +503,28 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } else { connectFuture = future; + + // Schedule connect timeout. + int connectTimeoutMillis = config().getConnectTimeoutMillis(); + if (connectTimeoutMillis > 0) { + connectTimeoutFuture = eventLoop().schedule(new Runnable() { + @Override + public void run() { + if (connectTimeoutException == null) { + connectTimeoutException = new ConnectException("connection timed out"); + } + ChannelFuture connectFuture = AbstractChannel.this.connectFuture; + if (connectFuture == null) { + return; + } else { + if (connectFuture.setFailure(connectTimeoutException)) { + pipeline().fireExceptionCaught(connectTimeoutException); + close(voidFuture()); + } + } + } + }, connectTimeoutMillis, TimeUnit.MILLISECONDS); + } } } catch (Throwable t) { future.setFailure(t); @@ -526,6 +553,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha pipeline().fireExceptionCaught(t); closeIfClosed(); } finally { + connectTimeoutFuture.cancel(false); connectFuture = null; } } diff --git a/transport/src/main/java/io/netty/channel/ChannelConfig.java b/transport/src/main/java/io/netty/channel/ChannelConfig.java index 9c92ed4530..5e7c8d2938 100644 --- a/transport/src/main/java/io/netty/channel/ChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/ChannelConfig.java @@ -16,7 +16,7 @@ package io.netty.channel; import io.netty.channel.socket.SocketChannelConfig; -import io.netty.channel.socket.nio.NioSocketChannelConfig; +import io.netty.channel.socket.nio.NioChannelConfig; import java.util.Map; @@ -38,9 +38,19 @@ import java.util.Map; * the configuration of a {@link Channel} without down-casting its associated * {@link ChannelConfig}. To update an option map, please call {@link #setOptions(Map)}. *

- * Options are available in the sub-types of {@link ChannelConfig}. For + * All {@link ChannelConfig} has the following options: + * + * + * + * + * + * + * + *
NameAssociated setter method
{@code "connectTimeoutMillis"}{@link #setConnectTimeoutMillis(int)}
+ *

+ * More options are available in the sub-types of {@link ChannelConfig}. For * example, you can configure the parameters which are specific to a TCP/IP - * socket as explained in {@link SocketChannelConfig} or {@link NioSocketChannelConfig}. + * socket as explained in {@link SocketChannelConfig} or {@link NioChannelConfig}. * * @apiviz.has io.netty.channel.ChannelPipelineFactory * @apiviz.composedOf io.netty.channel.ReceiveBufferSizePredictor @@ -75,4 +85,23 @@ public interface ChannelConfig { * @return {@code true} if and only if the property has been set */ boolean setOption(String name, Object value); + + /** + * Returns the connect timeout of the channel in milliseconds. If the + * {@link Channel} does not support connect operation, this property is not + * used at all, and therefore will be ignored. + * + * @return the connect timeout in milliseconds. {@code 0} if disabled. + */ + int getConnectTimeoutMillis(); + + /** + * Sets the connect timeout of the channel in milliseconds. If the + * {@link Channel} does not support connect operation, this property is not + * used at all, and therefore will be ignored. + * + * @param connectTimeoutMillis the connect timeout in milliseconds. + * {@code 0} to disable. + */ + void setConnectTimeoutMillis(int connectTimeoutMillis); } diff --git a/transport/src/main/java/io/netty/channel/ClientChannelConfig.java b/transport/src/main/java/io/netty/channel/ClientChannelConfig.java deleted file mode 100644 index 551ad14c33..0000000000 --- a/transport/src/main/java/io/netty/channel/ClientChannelConfig.java +++ /dev/null @@ -1,6 +0,0 @@ -package io.netty.channel; - -public interface ClientChannelConfig extends ChannelConfig { - long getConnectTimeoutMillis(); - void setConnectTimeoutMillis(long connectTimeoutMillis); -} diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java index 0aea4ca2de..50d0e6a60f 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java @@ -16,6 +16,7 @@ package io.netty.channel; import io.netty.channel.socket.SocketChannelConfig; +import io.netty.util.internal.ConversionUtil; import java.util.Map; import java.util.Map.Entry; @@ -24,6 +25,11 @@ import java.util.Map.Entry; * The default {@link SocketChannelConfig} implementation. */ public class DefaultChannelConfig implements ChannelConfig { + + private static final int DEFAULT_CONNECT_TIMEOUT = 30000; + + private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT; + @Override public void setOptions(Map options) { for (Entry e: options.entrySet()) { @@ -33,6 +39,25 @@ public class DefaultChannelConfig implements ChannelConfig { @Override public boolean setOption(String key, Object value) { - return false; + if ("connectTimeoutMillis".equals(key)) { + setConnectTimeoutMillis(ConversionUtil.toInt(value)); + } else { + return false; + } + return true; + } + + @Override + public int getConnectTimeoutMillis() { + return connectTimeoutMillis; + } + + @Override + public void setConnectTimeoutMillis(int connectTimeoutMillis) { + if (connectTimeoutMillis < 0) { + throw new IllegalArgumentException(String.format( + "connectTimeoutMillis: %d (expected: >= 0)", connectTimeoutMillis)); + } + this.connectTimeoutMillis = connectTimeoutMillis; } } diff --git a/transport/src/main/java/io/netty/channel/DefaultClientChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultClientChannelConfig.java deleted file mode 100644 index c74a104019..0000000000 --- a/transport/src/main/java/io/netty/channel/DefaultClientChannelConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -package io.netty.channel; - -import io.netty.util.internal.ConversionUtil; - -import java.util.concurrent.TimeUnit; - -public class DefaultClientChannelConfig extends DefaultChannelConfig implements - ClientChannelConfig { - - private static final long DEFAULT_CONNECT_TIMEOUT = TimeUnit.SECONDS.toMillis(30); - - private volatile long connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT; - - @Override - public boolean setOption(String key, Object value) { - if ("connectTimeoutMillis".equals(key)) { - setConnectTimeoutMillis(ConversionUtil.toLong(value)); - } else { - return false; - } - return true; - } - - @Override - public long getConnectTimeoutMillis() { - return connectTimeoutMillis; - } - - @Override - public void setConnectTimeoutMillis(long connectTimeoutMillis) { - if (connectTimeoutMillis < 0) { - throw new IllegalArgumentException("connectTimeoutMillis: " + connectTimeoutMillis); - } - this.connectTimeoutMillis = connectTimeoutMillis; - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java index e897fa7feb..ad60c615a3 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java @@ -22,7 +22,6 @@ import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import java.io.IOException; -import java.net.ConnectException; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -90,8 +89,6 @@ public class SelectorEventLoop extends SingleThreadEventLoop { @Override protected void run() { - long lastConnectTimeoutCheckTimeNanos = System.nanoTime(); - Selector selector = this.selector; for (;;) { @@ -136,13 +133,6 @@ public class SelectorEventLoop extends SingleThreadEventLoop { processTaskQueue(); processSelectedKeys(); - // Handle connection timeout every 10 milliseconds approximately. - long currentTimeNanos = System.nanoTime(); - if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 10 * 1000000L) { - lastConnectTimeoutCheckTimeNanos = currentTimeNanos; - processConnectTimeout(selector.keys(), currentTimeNanos); - } - if (isShutdown()) { closeAll(); break; @@ -223,43 +213,6 @@ public class SelectorEventLoop extends SingleThreadEventLoop { } } - protected void processConnectTimeout(Set keys, long currentTimeNanos) { - ConnectException cause = null; - for (SelectionKey k: keys) { - if (!k.isValid()) { - // Comment the close call again as it gave us major problems with ClosedChannelExceptions. - // - // See: - // * https://github.com/netty/netty/issues/142 - // * https://github.com/netty/netty/issues/138 - // - //close(k); - continue; - } - - // Something is ready so skip it - if (k.readyOps() != 0) { - continue; - } - // check if the channel is in - // FIXME: Implement connect timeout. -// Channel ch = (Channel) k.attachment(); -// if (attachment instanceof NioClientSocketChannel) { -// NioClientSocketChannel ch = (NioClientSocketChannel) attachment; -// if (!ch.isConnected() && ch.connectDeadlineNanos > 0 && currentTimeNanos >= ch.connectDeadlineNanos) { -// -// if (cause == null) { -// cause = new ConnectException("connection timed out"); -// } -// -// ch.connectFuture.setFailure(cause); -// fireExceptionCaught(ch, cause); -// ch.getWorker().close(ch, succeededFuture(ch)); -// } -// } - } - } - private boolean cleanUpCancelledKeys() throws IOException { if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0;