From 7ffd228cbf2c2fca408ecb125dabce68e77edf2a Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 14 Aug 2009 20:22:50 +0000 Subject: [PATCH] Fixed issue: NETTY-204 Connection timeout does not work with the NIO socket transport. * implemented connection timeout for NioClientSocketChannel --- .../socket/nio/NioClientSocketChannel.java | 3 ++ .../nio/NioClientSocketPipelineSink.java | 48 +++++++++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannel.java index 67ff4f29eb..10993c8828 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannel.java @@ -80,6 +80,9 @@ final class NioClientSocketChannel extends NioSocketChannel { volatile ChannelFuture connectFuture; volatile boolean boundManually; + // Does not need to be volatile as it's accessed by only one thread. + long connectDeadlineNanos; + NioClientSocketChannel( ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, NioWorker worker) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index c134cd1d4b..a09f8fa9c2 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -25,6 +25,7 @@ package org.jboss.netty.channel.socket.nio; import static org.jboss.netty.channel.Channels.*; import java.io.IOException; +import java.net.ConnectException; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; @@ -173,7 +174,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { super(); } - void register(NioSocketChannel channel) { + void register(NioClientSocketChannel channel) { Runnable registerTask = new RegisterTask(this, channel); Selector selector; @@ -227,6 +228,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { public void run() { boolean shutdown = false; Selector selector = this.selector; + long lastConnectTimeoutCheckTimeNanos = System.nanoTime(); for (;;) { wakenUp.set(false); @@ -271,6 +273,13 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { processSelectedKeys(selector.selectedKeys()); } + // Handle connection timeout every 0.5 seconds approximately. + long currentTimeNanos = System.nanoTime(); + if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 500 * 1000000L) { + lastConnectTimeoutCheckTimeNanos = currentTimeNanos; + processConnectTimeout(selector.keys(), currentTimeNanos); + } + // Exit the loop when there's nothing to handle. // The shutdown flag is used to delay the shutdown of this // loop to avoid excessive Selector creation when @@ -344,6 +353,34 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { } } + private void processConnectTimeout(Set keys, long currentTimeNanos) { + ConnectException cause = null; + for (SelectionKey k: keys) { + if (!k.isValid()) { + close(k); + continue; + } + + if (k.isConnectable()) { + connect(k); + continue; + } + + NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); + if (ch.connectDeadlineNanos > 0 && + currentTimeNanos >= ch.connectDeadlineNanos) { + + if (cause == null) { + cause = new ConnectException("connection timed out"); + } + + ch.connectFuture.setFailure(cause); + fireExceptionCaught(ch, cause); + close(k); + } + } + } + private void connect(SelectionKey k) { NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); try { @@ -367,9 +404,9 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { private static final class RegisterTask implements Runnable { private final Boss boss; - private final NioSocketChannel channel; + private final NioClientSocketChannel channel; - RegisterTask(Boss boss, NioSocketChannel channel) { + RegisterTask(Boss boss, NioClientSocketChannel channel) { this.boss = boss; this.channel = channel; } @@ -381,6 +418,11 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { } catch (ClosedChannelException e) { NioWorker.close(channel, succeededFuture(channel)); } + + int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); + if (connectTimeout > 0) { + channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L; + } } } }