Fixed issue: NETTY-204 Connection timeout does not work with the NIO socket transport.

* implemented connection timeout for NioClientSocketChannel
This commit is contained in:
Trustin Lee 2009-08-14 20:22:50 +00:00
parent 05ddfdab46
commit 7ffd228cbf
2 changed files with 48 additions and 3 deletions

View File

@ -80,6 +80,9 @@ final class NioClientSocketChannel extends NioSocketChannel {
volatile ChannelFuture connectFuture; volatile ChannelFuture connectFuture;
volatile boolean boundManually; volatile boolean boundManually;
// Does not need to be volatile as it's accessed by only one thread.
long connectDeadlineNanos;
NioClientSocketChannel( NioClientSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline, ChannelFactory factory, ChannelPipeline pipeline,
ChannelSink sink, NioWorker worker) { ChannelSink sink, NioWorker worker) {

View File

@ -25,6 +25,7 @@ package org.jboss.netty.channel.socket.nio;
import static org.jboss.netty.channel.Channels.*; import static org.jboss.netty.channel.Channels.*;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
@ -173,7 +174,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
super(); super();
} }
void register(NioSocketChannel channel) { void register(NioClientSocketChannel channel) {
Runnable registerTask = new RegisterTask(this, channel); Runnable registerTask = new RegisterTask(this, channel);
Selector selector; Selector selector;
@ -227,6 +228,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
public void run() { public void run() {
boolean shutdown = false; boolean shutdown = false;
Selector selector = this.selector; Selector selector = this.selector;
long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
for (;;) { for (;;) {
wakenUp.set(false); wakenUp.set(false);
@ -271,6 +273,13 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
processSelectedKeys(selector.selectedKeys()); processSelectedKeys(selector.selectedKeys());
} }
// Handle connection timeout every 0.5 seconds approximately.
long currentTimeNanos = System.nanoTime();
if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 500 * 1000000L) {
lastConnectTimeoutCheckTimeNanos = currentTimeNanos;
processConnectTimeout(selector.keys(), currentTimeNanos);
}
// Exit the loop when there's nothing to handle. // Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this // The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when // loop to avoid excessive Selector creation when
@ -344,6 +353,34 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
} }
} }
private void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
ConnectException cause = null;
for (SelectionKey k: keys) {
if (!k.isValid()) {
close(k);
continue;
}
if (k.isConnectable()) {
connect(k);
continue;
}
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
if (ch.connectDeadlineNanos > 0 &&
currentTimeNanos >= ch.connectDeadlineNanos) {
if (cause == null) {
cause = new ConnectException("connection timed out");
}
ch.connectFuture.setFailure(cause);
fireExceptionCaught(ch, cause);
close(k);
}
}
}
private void connect(SelectionKey k) { private void connect(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
try { try {
@ -367,9 +404,9 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
private static final class RegisterTask implements Runnable { private static final class RegisterTask implements Runnable {
private final Boss boss; private final Boss boss;
private final NioSocketChannel channel; private final NioClientSocketChannel channel;
RegisterTask(Boss boss, NioSocketChannel channel) { RegisterTask(Boss boss, NioClientSocketChannel channel) {
this.boss = boss; this.boss = boss;
this.channel = channel; this.channel = channel;
} }
@ -381,6 +418,11 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
NioWorker.close(channel, succeededFuture(channel)); NioWorker.close(channel, succeededFuture(channel));
} }
int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
if (connectTimeout > 0) {
channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
}
} }
} }
} }