From 68777158a4d5a6d1fcedd4c68eda80ae40da310f Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 18 Sep 2012 08:21:53 +0200 Subject: [PATCH] Use a TimerTask to trigger handling of timeouts, so we can raise the select timeout again and so solve the problems with heavy context-switches --- .../socket/nio/NioClientSocketChannel.java | 3 +++ .../nio/NioClientSocketPipelineSink.java | 26 ++++++++++++++++++- .../channel/socket/nio/SelectorUtil.java | 2 +- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannel.java index 2753f64973..b473c05928 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannel.java @@ -27,6 +27,7 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.Timeout; final class NioClientSocketChannel extends NioSocketChannel { @@ -71,6 +72,8 @@ final class NioClientSocketChannel extends NioSocketChannel { // Does not need to be volatile as it's accessed by only one thread. long connectDeadlineNanos; + volatile Timeout timoutTimer; + NioClientSocketChannel( ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, NioWorker worker) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 12f08f4ae5..228efd72de 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -45,12 +46,17 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.Timer; +import org.jboss.netty.util.TimerTask; import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker; class NioClientSocketPipelineSink extends AbstractNioChannelSink { private static final AtomicInteger nextId = new AtomicInteger(); + private static final Timer TIMER = new HashedWheelTimer(); static final InternalLogger logger = InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); @@ -179,6 +185,15 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { private final Object startStopLock = new Object(); private final Queue registerTaskQueue = new ConcurrentLinkedQueue(); private final int subId; + private final TimerTask wakeupTask = new TimerTask() { + public void run(Timeout timeout) throws Exception { + if (selector != null) { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } + } + }; Boss(int subId) { this.subId = subId; @@ -231,10 +246,16 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { boolean offered = registerTaskQueue.offer(registerTask); assert offered; } - + if (channel.connectDeadlineNanos > 0) { + if (!channel.isConnected()) { + channel.timoutTimer = TIMER.newTimeout(wakeupTask, + channel.connectDeadlineNanos, TimeUnit.NANOSECONDS); + } + } if (wakenUp.compareAndSet(false, true)) { selector.wakeup(); } + } public void run() { @@ -471,6 +492,9 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); if (ch.channel.finishConnect()) { k.cancel(); + if (ch.timoutTimer != null) { + ch.timoutTimer.cancel(); + } ch.worker.register(ch, ch.connectFuture); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/SelectorUtil.java b/src/main/java/org/jboss/netty/channel/socket/nio/SelectorUtil.java index a263f9e4b2..c2889e92c6 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/SelectorUtil.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/SelectorUtil.java @@ -29,7 +29,7 @@ final class SelectorUtil { InternalLoggerFactory.getInstance(SelectorUtil.class); static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; - static final long DEFAULT_SELECT_TIMEOUT = 10; + static final long DEFAULT_SELECT_TIMEOUT = 500; static final long SELECT_TIMEOUT = SystemPropertyUtil.getLong("org.jboss.netty.selectTimeout", DEFAULT_SELECT_TIMEOUT); static final long SELECT_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(SELECT_TIMEOUT);