* Improved the throughput of the server-side accept operation

* Added FastQueue.isEmpty()
This commit is contained in:
Trustin Lee 2008-09-28 15:01:21 +00:00
parent 4d17db6eb1
commit dccc9f8665
3 changed files with 92 additions and 72 deletions

View File

@ -202,10 +202,6 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
for (;;) {
try {
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket == null) {
continue;
}
try {
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();

View File

@ -32,7 +32,6 @@ import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
@ -72,11 +71,15 @@ class NioWorker implements Runnable {
private final int id;
private final Executor executor;
private final AtomicBoolean started = new AtomicBoolean();
volatile Thread thread;
volatile Selector selector;
final AtomicBoolean wakenUp = new AtomicBoolean();
final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
private volatile Thread thread;
private volatile Selector selector;
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
//private final FastQueue<Runnable> taskQueue = new FastQueue<Runnable>();
//private final ConcurrentFastQueue<Runnable> taskQueue = new ConcurrentFastQueue<Runnable>();
private final FastQueue<Runnable> registerTaskQueue = new FastQueue<Runnable>();
private final ConcurrentLinkedQueue<Runnable> writeTaskQueue = new ConcurrentLinkedQueue<Runnable>();
NioWorker(int bossId, int id, Executor executor) {
this.bossId = bossId;
@ -104,61 +107,25 @@ class NioWorker implements Runnable {
}
}
if (firstChannel) {
try {
channel.socket.register(selector, SelectionKey.OP_READ, channel);
if (future != null) {
future.setSuccess();
}
} catch (ClosedChannelException e) {
future.setFailure(e);
throw new ChannelException(
"Failed to register a socket to the selector.", e);
}
boolean server = !(channel instanceof NioClientSocketChannel);
if (server) {
fireChannelOpen(channel);
fireChannelBound(channel, channel.getLocalAddress());
} else if (!((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, channel.getLocalAddress());
}
fireChannelConnected(channel, channel.getRemoteAddress());
Runnable registerTask = new RegisterTask(selector, channel, future, server);
if (firstChannel) {
registerTask.run();
String threadName =
(server ? "New I/O server worker #"
: "New I/O client worker #") + bossId + '-' + id;
executor.execute(new ThreadRenamingRunnable(this, threadName));
} else {
selectorGuard.readLock().lock();
shutdownLock.readLock().lock();
try {
registerTaskQueue.offer(registerTask);
} finally {
shutdownLock.readLock().unlock();
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
try {
channel.socket.register(selector, SelectionKey.OP_READ, channel);
if (future != null) {
future.setSuccess();
}
} catch (ClosedChannelException e) {
future.setFailure(e);
throw new ChannelException(
"Failed to register a socket to the selector.", e);
}
boolean server = !(channel instanceof NioClientSocketChannel);
if (server) {
fireChannelOpen(channel);
fireChannelBound(channel, channel.getLocalAddress());
} else if (!((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, channel.getLocalAddress());
}
fireChannelConnected(channel, channel.getRemoteAddress());
} finally {
selectorGuard.readLock().unlock();
}
}
}
@ -170,15 +137,18 @@ class NioWorker implements Runnable {
for (;;) {
wakenUp.set(false);
if (CONSTRAINT_LEVEL != 0) {
selectorGuard.writeLock().lock();
// This empty synchronization block prevents the selector
// from acquiring its lock.
selectorGuard.writeLock().unlock();
}
try {
int selectedKeyCount = selector.select(500);
processTaskQueue();
processRegisterTaskQueue();
processWriteTaskQueue();
if (selectedKeyCount > 0) {
processSelectedKeys(selector.selectedKeys());
@ -193,9 +163,9 @@ class NioWorker implements Runnable {
if (shutdown ||
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
selectorGuard.writeLock().lock();
shutdownLock.writeLock().lock();
try {
if (selector.keys().isEmpty()) {
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
try {
selector.close();
} catch (IOException e) {
@ -210,7 +180,7 @@ class NioWorker implements Runnable {
shutdown = false;
}
} finally {
selectorGuard.writeLock().unlock();
shutdownLock.writeLock().unlock();
}
} else {
// Give one more second.
@ -233,9 +203,20 @@ class NioWorker implements Runnable {
}
}
private void processTaskQueue() {
private void processRegisterTaskQueue() {
for (;;) {
final Runnable task = taskQueue.poll();
final Runnable task = registerTaskQueue.poll();
if (task == null) {
break;
}
task.run();
}
}
private void processWriteTaskQueue() {
for (;;) {
final Runnable task = writeTaskQueue.poll();
if (task == null) {
break;
}
@ -390,7 +371,7 @@ class NioWorker implements Runnable {
Thread workerThread = worker.thread;
if (workerThread != null && Thread.currentThread() != workerThread) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
worker.taskQueue.offer(channel.writeTask);
worker.writeTaskQueue.offer(channel.writeTask);
}
if (worker.wakenUp.compareAndSet(false, true)) {
worker.selector.wakeup();
@ -434,11 +415,11 @@ class NioWorker implements Runnable {
int bufIdx;
synchronized (channel.writeLock) {
FastQueue<MessageEvent> internalWriteBuffer = channel.writeBuffer;
FastQueue<MessageEvent> writeBuffer = channel.writeBuffer;
evt = channel.currentWriteEvent;
for (;;) {
if (evt == null) {
evt = internalWriteBuffer.poll();
evt = writeBuffer.poll();
if (evt == null) {
channel.currentWriteEvent = null;
removeOpWrite = true;
@ -510,11 +491,11 @@ class NioWorker implements Runnable {
int writtenBytes = 0;
synchronized (channel.writeLock) {
FastQueue<MessageEvent> internalWriteBuffer = channel.writeBuffer;
FastQueue<MessageEvent> writeBuffer = channel.writeBuffer;
evt = channel.currentWriteEvent;
for (;;) {
if (evt == null) {
evt = internalWriteBuffer.poll();
evt = writeBuffer.poll();
if (evt == null) {
channel.currentWriteEvent = null;
removeOpWrite = true;
@ -758,9 +739,9 @@ class NioWorker implements Runnable {
fireExceptionCaught(channel, cause);
}
FastQueue<MessageEvent> internalWriteBuffer = channel.writeBuffer;
FastQueue<MessageEvent> writeBuffer = channel.writeBuffer;
for (;;) {
evt = internalWriteBuffer.poll();
evt = writeBuffer.poll();
if (evt == null) {
break;
}
@ -837,4 +818,42 @@ class NioWorker implements Runnable {
fireExceptionCaught(channel, t);
}
}
private class RegisterTask implements Runnable {
private final Selector selector;
private final NioSocketChannel channel;
private final ChannelFuture future;
private final boolean server;
RegisterTask(
Selector selector,
NioSocketChannel channel, ChannelFuture future, boolean server) {
this.selector = selector;
this.channel = channel;
this.future = future;
this.server = server;
}
public void run() {
try {
channel.socket.register(selector, SelectionKey.OP_READ, channel);
if (future != null) {
future.setSuccess();
}
} catch (ClosedChannelException e) {
future.setFailure(e);
throw new ChannelException(
"Failed to register a socket to the selector.", e);
}
if (server) {
fireChannelOpen(channel);
fireChannelBound(channel, channel.getLocalAddress());
} else if (!((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, channel.getLocalAddress());
}
fireChannelConnected(channel, channel.getRemoteAddress());
}
}
}

View File

@ -71,6 +71,11 @@ public class FastQueue<E> {
return null;
}
public synchronized boolean isEmpty() {
return offeredElements == null &&
(drainedElements == null || drainedElements.isEmpty());
}
@SuppressWarnings("unchecked")
private E cast(Object o) {
return (E) o;