Merge pull request #613 from netty/timeout_improvement

Use a TimerTask to trigger handling of timeouts, so we can raise the sel...
This commit is contained in:
Norman Maurer 2012-09-21 08:33:42 -07:00
commit 0ee4bbf8e7
4 changed files with 47 additions and 9 deletions

View File

@ -27,6 +27,7 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.Timeout;
final class NioClientSocketChannel extends NioSocketChannel { final class NioClientSocketChannel extends NioSocketChannel {
@ -71,6 +72,8 @@ final class NioClientSocketChannel extends NioSocketChannel {
// Does not need to be volatile as it's accessed by only one thread. // Does not need to be volatile as it's accessed by only one thread.
long connectDeadlineNanos; long connectDeadlineNanos;
volatile Timeout timoutTimer;
NioClientSocketChannel( NioClientSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline, ChannelFactory factory, ChannelPipeline pipeline,
ChannelSink sink, NioWorker worker) { ChannelSink sink, NioWorker worker) {

View File

@ -26,6 +26,8 @@ import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.util.ExternalResourceReleasable; import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.internal.ExecutorUtil; import org.jboss.netty.util.internal.ExecutorUtil;
/** /**
@ -86,6 +88,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
private final Executor bossExecutor; private final Executor bossExecutor;
private final WorkerPool<NioWorker> workerPool; private final WorkerPool<NioWorker> workerPool;
private final NioClientSocketPipelineSink sink; private final NioClientSocketPipelineSink sink;
private final Timer timer;
/** /**
* Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()}
@ -152,6 +155,12 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
public NioClientSocketChannelFactory( public NioClientSocketChannelFactory(
Executor bossExecutor, int bossCount, Executor bossExecutor, int bossCount,
WorkerPool<NioWorker> workerPool) { WorkerPool<NioWorker> workerPool) {
this(bossExecutor, bossCount, workerPool, new HashedWheelTimer());
}
public NioClientSocketChannelFactory(
Executor bossExecutor, int bossCount,
WorkerPool<NioWorker> workerPool, Timer timer) {
if (bossExecutor == null) { if (bossExecutor == null) {
throw new NullPointerException("bossExecutor"); throw new NullPointerException("bossExecutor");
@ -168,8 +177,9 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
this.bossExecutor = bossExecutor; this.bossExecutor = bossExecutor;
this.workerPool = workerPool; this.workerPool = workerPool;
this.timer = timer;
sink = new NioClientSocketPipelineSink( sink = new NioClientSocketPipelineSink(
bossExecutor, bossCount, workerPool); bossExecutor, bossCount, workerPool, timer);
} }
public SocketChannel newChannel(ChannelPipeline pipeline) { public SocketChannel newChannel(ChannelPipeline pipeline) {
@ -178,6 +188,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
public void releaseExternalResources() { public void releaseExternalResources() {
ExecutorUtil.terminate(bossExecutor); ExecutorUtil.terminate(bossExecutor);
timer.stop();
if (workerPool instanceof ExternalResourceReleasable) { if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources(); ((ExternalResourceReleasable) workerPool).releaseExternalResources();
} }

View File

@ -32,6 +32,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -45,6 +46,10 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.DeadLockProofWorker;
@ -64,11 +69,13 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
private final WorkerPool<NioWorker> workerPool; private final WorkerPool<NioWorker> workerPool;
private final Timer timer;
NioClientSocketPipelineSink( NioClientSocketPipelineSink(
Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) { Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool, Timer timer) {
this.bossExecutor = bossExecutor; this.bossExecutor = bossExecutor;
this.timer = timer;
bosses = new Boss[bossCount]; bosses = new Boss[bossCount];
for (int i = 0; i < bosses.length; i ++) { for (int i = 0; i < bosses.length; i ++) {
bosses[i] = new Boss(i); bosses[i] = new Boss(i);
@ -179,6 +186,15 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
private final Object startStopLock = new Object(); private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>(); private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
private final int subId; private final int subId;
private final TimerTask wakeupTask = new TimerTask() {
public void run(Timeout timeout) throws Exception {
if (selector != null) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}
};
Boss(int subId) { Boss(int subId) {
this.subId = subId; this.subId = subId;
@ -231,10 +247,17 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
boolean offered = registerTaskQueue.offer(registerTask); boolean offered = registerTaskQueue.offer(registerTask);
assert offered; assert offered;
} }
int timeout = channel.getConfig().getConnectTimeoutMillis();
if (timeout > 0) {
if (!channel.isConnected()) {
channel.timoutTimer = timer.newTimeout(wakeupTask,
timeout, TimeUnit.MILLISECONDS);
}
}
if (wakenUp.compareAndSet(false, true)) { if (wakenUp.compareAndSet(false, true)) {
selector.wakeup(); selector.wakeup();
} }
} }
public void run() { public void run() {
@ -339,10 +362,8 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
// Handle connection timeout every 10 milliseconds approximately. // Handle connection timeout every 10 milliseconds approximately.
long currentTimeNanos = System.nanoTime(); long currentTimeNanos = System.nanoTime();
if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 10 * 1000000L) { lastConnectTimeoutCheckTimeNanos = currentTimeNanos;
lastConnectTimeoutCheckTimeNanos = currentTimeNanos; processConnectTimeout(selector.keys(), currentTimeNanos);
processConnectTimeout(selector.keys(), currentTimeNanos);
}
// Exit the loop when there's nothing to handle. // Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this // The shutdown flag is used to delay the shutdown of this
@ -471,6 +492,9 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
if (ch.channel.finishConnect()) { if (ch.channel.finishConnect()) {
k.cancel(); k.cancel();
if (ch.timoutTimer != null) {
ch.timoutTimer.cancel();
}
ch.worker.register(ch, ch.connectFuture); ch.worker.register(ch, ch.connectFuture);
} }
} }

View File

@ -29,7 +29,7 @@ final class SelectorUtil {
InternalLoggerFactory.getInstance(SelectorUtil.class); InternalLoggerFactory.getInstance(SelectorUtil.class);
static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
static final long DEFAULT_SELECT_TIMEOUT = 10; static final long DEFAULT_SELECT_TIMEOUT = 500;
static final long SELECT_TIMEOUT = static final long SELECT_TIMEOUT =
SystemPropertyUtil.getLong("org.jboss.netty.selectTimeout", DEFAULT_SELECT_TIMEOUT); SystemPropertyUtil.getLong("org.jboss.netty.selectTimeout", DEFAULT_SELECT_TIMEOUT);
static final long SELECT_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(SELECT_TIMEOUT); static final long SELECT_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(SELECT_TIMEOUT);