Use a TimerTask to trigger handling of timeouts, so we can raise the select timeout again and so solve the problems with heavy context-switches
This commit is contained in:
parent
c3af4427bb
commit
68777158a4
@ -27,6 +27,7 @@ import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelSink;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.Timeout;
|
||||
|
||||
final class NioClientSocketChannel extends NioSocketChannel {
|
||||
|
||||
@ -71,6 +72,8 @@ final class NioClientSocketChannel extends NioSocketChannel {
|
||||
// Does not need to be volatile as it's accessed by only one thread.
|
||||
long connectDeadlineNanos;
|
||||
|
||||
volatile Timeout timoutTimer;
|
||||
|
||||
NioClientSocketChannel(
|
||||
ChannelFactory factory, ChannelPipeline pipeline,
|
||||
ChannelSink sink, NioWorker worker) {
|
||||
|
@ -32,6 +32,7 @@ import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@ -45,12 +46,17 @@ import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.HashedWheelTimer;
|
||||
import org.jboss.netty.util.Timeout;
|
||||
import org.jboss.netty.util.Timer;
|
||||
import org.jboss.netty.util.TimerTask;
|
||||
import org.jboss.netty.util.ThreadRenamingRunnable;
|
||||
import org.jboss.netty.util.internal.DeadLockProofWorker;
|
||||
|
||||
class NioClientSocketPipelineSink extends AbstractNioChannelSink {
|
||||
|
||||
private static final AtomicInteger nextId = new AtomicInteger();
|
||||
private static final Timer TIMER = new HashedWheelTimer();
|
||||
|
||||
static final InternalLogger logger =
|
||||
InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
|
||||
@ -179,6 +185,15 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
|
||||
private final Object startStopLock = new Object();
|
||||
private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
|
||||
private final int subId;
|
||||
private final TimerTask wakeupTask = new TimerTask() {
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
if (selector != null) {
|
||||
if (wakenUp.compareAndSet(false, true)) {
|
||||
selector.wakeup();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Boss(int subId) {
|
||||
this.subId = subId;
|
||||
@ -231,10 +246,16 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
|
||||
boolean offered = registerTaskQueue.offer(registerTask);
|
||||
assert offered;
|
||||
}
|
||||
|
||||
if (channel.connectDeadlineNanos > 0) {
|
||||
if (!channel.isConnected()) {
|
||||
channel.timoutTimer = TIMER.newTimeout(wakeupTask,
|
||||
channel.connectDeadlineNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
if (wakenUp.compareAndSet(false, true)) {
|
||||
selector.wakeup();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void run() {
|
||||
@ -471,6 +492,9 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
|
||||
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
|
||||
if (ch.channel.finishConnect()) {
|
||||
k.cancel();
|
||||
if (ch.timoutTimer != null) {
|
||||
ch.timoutTimer.cancel();
|
||||
}
|
||||
ch.worker.register(ch, ch.connectFuture);
|
||||
}
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ final class SelectorUtil {
|
||||
InternalLoggerFactory.getInstance(SelectorUtil.class);
|
||||
|
||||
static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
|
||||
static final long DEFAULT_SELECT_TIMEOUT = 10;
|
||||
static final long DEFAULT_SELECT_TIMEOUT = 500;
|
||||
static final long SELECT_TIMEOUT =
|
||||
SystemPropertyUtil.getLong("org.jboss.netty.selectTimeout", DEFAULT_SELECT_TIMEOUT);
|
||||
static final long SELECT_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(SELECT_TIMEOUT);
|
||||
|
Loading…
Reference in New Issue
Block a user