Fixed throughput issue when messages are written from different threads than I/O worker threads

This commit is contained in:
Trustin Lee 2008-09-26 03:02:31 +00:00
parent ebdcb21421
commit 5f1ecc9022
2 changed files with 42 additions and 1 deletions

View File

@ -49,6 +49,7 @@ abstract class NioSocketChannel extends AbstractChannel
final SocketChannel socket; final SocketChannel socket;
private final NioSocketChannelConfig config; private final NioSocketChannelConfig config;
final Runnable writeTask = new WriteTask();
final Queue<MessageEvent> writeBuffer = final Queue<MessageEvent> writeBuffer =
new ConcurrentLinkedQueue<MessageEvent>(); new ConcurrentLinkedQueue<MessageEvent>();
MessageEvent currentWriteEvent; MessageEvent currentWriteEvent;
@ -110,4 +111,15 @@ abstract class NioSocketChannel extends AbstractChannel
return getUnsupportedOperationFuture(); return getUnsupportedOperationFuture();
} }
} }
private class WriteTask implements Runnable {
WriteTask() {
super();
}
public void run() {
NioWorker.write(NioSocketChannel.this, false);
}
}
} }

View File

@ -32,7 +32,9 @@ import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.Iterator; import java.util.Iterator;
import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -72,6 +74,7 @@ class NioWorker implements Runnable {
volatile Selector selector; volatile Selector selector;
final AtomicBoolean wakenUp = new AtomicBoolean(); final AtomicBoolean wakenUp = new AtomicBoolean();
final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
NioWorker(int bossId, int id, Executor executor) { NioWorker(int bossId, int id, Executor executor) {
this.bossId = bossId; this.bossId = bossId;
@ -172,6 +175,9 @@ class NioWorker implements Runnable {
try { try {
int selectedKeyCount = selector.select(500); int selectedKeyCount = selector.select(500);
processTaskQueue();
if (selectedKeyCount > 0) { if (selectedKeyCount > 0) {
processSelectedKeys(selector.selectedKeys()); processSelectedKeys(selector.selectedKeys());
} }
@ -225,6 +231,17 @@ class NioWorker implements Runnable {
} }
} }
private void processTaskQueue() {
for (;;) {
final Runnable task = taskQueue.poll();
if (task == null) {
break;
}
task.run();
}
}
private static void processSelectedKeys(Set<SelectionKey> selectedKeys) { private static void processSelectedKeys(Set<SelectionKey> selectedKeys) {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next(); SelectionKey k = i.next();
@ -298,7 +315,19 @@ class NioWorker implements Runnable {
close(ch, ch.getSucceededFuture()); close(ch, ch.getSucceededFuture());
} }
static void write(NioSocketChannel channel, boolean mightNeedWakeup) { // FIXME I/O 스레드냐 아니냐에 따라서 task queue 안넣거나 넣거나 IoSocketHandler, IoSocketDispatcher
static void write(final NioSocketChannel channel, boolean mightNeedWakeup) {
if (mightNeedWakeup) {
NioWorker worker = channel.getWorker();
if (worker != null && Thread.currentThread() != worker.thread) {
worker.taskQueue.offer(channel.writeTask);
if (worker.wakenUp.compareAndSet(false, true)) {
worker.selector.wakeup();
}
return;
}
}
if (!channel.isConnected()) { if (!channel.isConnected()) {
cleanUpWriteBuffer(channel); cleanUpWriteBuffer(channel);
return; return;