diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannel.java index 2753f64973..b473c05928 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannel.java @@ -27,6 +27,7 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.Timeout; final class NioClientSocketChannel extends NioSocketChannel { @@ -71,6 +72,8 @@ final class NioClientSocketChannel extends NioSocketChannel { // Does not need to be volatile as it's accessed by only one thread. long connectDeadlineNanos; + volatile Timeout timoutTimer; + NioClientSocketChannel( ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, NioWorker worker) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java index 5b0800ee48..4a75ab6f94 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java @@ -26,6 +26,8 @@ import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.util.ExternalResourceReleasable; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timer; import org.jboss.netty.util.internal.ExecutorUtil; /** @@ -86,6 +88,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory private final Executor bossExecutor; private final WorkerPool workerPool; private final NioClientSocketPipelineSink sink; + private final Timer timer; /** * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} @@ -152,6 +155,12 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory public NioClientSocketChannelFactory( Executor bossExecutor, int bossCount, WorkerPool workerPool) { + this(bossExecutor, bossCount, workerPool, new HashedWheelTimer()); + } + + public NioClientSocketChannelFactory( + Executor bossExecutor, int bossCount, + WorkerPool workerPool, Timer timer) { if (bossExecutor == null) { throw new NullPointerException("bossExecutor"); @@ -168,8 +177,9 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory this.bossExecutor = bossExecutor; this.workerPool = workerPool; + this.timer = timer; sink = new NioClientSocketPipelineSink( - bossExecutor, bossCount, workerPool); + bossExecutor, bossCount, workerPool, timer); } public SocketChannel newChannel(ChannelPipeline pipeline) { @@ -178,6 +188,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory public void releaseExternalResources() { ExecutorUtil.terminate(bossExecutor); + timer.stop(); if (workerPool instanceof ExternalResourceReleasable) { ((ExternalResourceReleasable) workerPool).releaseExternalResources(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 12f08f4ae5..3d79ce51fa 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -45,6 +46,10 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.Timer; +import org.jboss.netty.util.TimerTask; import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker; @@ -64,11 +69,13 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { private final WorkerPool workerPool; + private final Timer timer; + NioClientSocketPipelineSink( - Executor bossExecutor, int bossCount, WorkerPool workerPool) { + Executor bossExecutor, int bossCount, WorkerPool workerPool, Timer timer) { this.bossExecutor = bossExecutor; - + this.timer = timer; bosses = new Boss[bossCount]; for (int i = 0; i < bosses.length; i ++) { bosses[i] = new Boss(i); @@ -179,6 +186,15 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { private final Object startStopLock = new Object(); private final Queue registerTaskQueue = new ConcurrentLinkedQueue(); private final int subId; + private final TimerTask wakeupTask = new TimerTask() { + public void run(Timeout timeout) throws Exception { + if (selector != null) { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } + } + }; Boss(int subId) { this.subId = subId; @@ -231,10 +247,17 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { boolean offered = registerTaskQueue.offer(registerTask); assert offered; } - + int timeout = channel.getConfig().getConnectTimeoutMillis(); + if (timeout > 0) { + if (!channel.isConnected()) { + channel.timoutTimer = timer.newTimeout(wakeupTask, + timeout, TimeUnit.MILLISECONDS); + } + } if (wakenUp.compareAndSet(false, true)) { selector.wakeup(); } + } public void run() { @@ -339,10 +362,8 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { // Handle connection timeout every 10 milliseconds approximately. long currentTimeNanos = System.nanoTime(); - if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 10 * 1000000L) { - lastConnectTimeoutCheckTimeNanos = currentTimeNanos; - processConnectTimeout(selector.keys(), currentTimeNanos); - } + lastConnectTimeoutCheckTimeNanos = currentTimeNanos; + processConnectTimeout(selector.keys(), currentTimeNanos); // Exit the loop when there's nothing to handle. // The shutdown flag is used to delay the shutdown of this @@ -471,6 +492,9 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); if (ch.channel.finishConnect()) { k.cancel(); + if (ch.timoutTimer != null) { + ch.timoutTimer.cancel(); + } ch.worker.register(ch, ch.connectFuture); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/SelectorUtil.java b/src/main/java/org/jboss/netty/channel/socket/nio/SelectorUtil.java index a263f9e4b2..c2889e92c6 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/SelectorUtil.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/SelectorUtil.java @@ -29,7 +29,7 @@ final class SelectorUtil { InternalLoggerFactory.getInstance(SelectorUtil.class); static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; - static final long DEFAULT_SELECT_TIMEOUT = 10; + static final long DEFAULT_SELECT_TIMEOUT = 500; static final long SELECT_TIMEOUT = SystemPropertyUtil.getLong("org.jboss.netty.selectTimeout", DEFAULT_SELECT_TIMEOUT); static final long SELECT_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(SELECT_TIMEOUT);