* Removed a solved question from the comment

* Applied the same optimization applied to the TCP transport to the UDP transport
This commit is contained in:
Trustin Lee 2010-02-10 08:17:58 +00:00
parent bf721614ac
commit dd7f588916
4 changed files with 33 additions and 18 deletions

View File

@ -114,7 +114,8 @@ class NioDatagramChannel extends AbstractChannel
/**
* Boolean that indicates that write operation is in progress.
*/
volatile boolean inWriteNowLoop;
boolean inWriteNowLoop;
boolean writeSuspended;
private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress;
@ -316,7 +317,7 @@ class NioDatagramChannel extends AbstractChannel
public void run() {
writeTaskInTaskQueue.set(false);
worker.write(NioDatagramChannel.this, false);
worker.writeFromTaskLoop(NioDatagramChannel.this);
}
}

View File

@ -108,7 +108,7 @@ class NioDatagramPipelineSink extends AbstractChannelSink {
final MessageEvent event = (MessageEvent) e;
final boolean offered = channel.writeBufferQueue.offer(event);
assert offered;
channel.worker.write(channel, true);
channel.worker.writeFromUserCode(channel);
}
}

View File

@ -350,7 +350,7 @@ class NioDatagramWorker implements Runnable {
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
write(k);
writeFromSelectorLoop(k);
}
} catch (CancelledKeyException e) {
close(k);
@ -371,10 +371,6 @@ class NioDatagramWorker implements Runnable {
return false;
}
private void write(SelectionKey k) {
write((NioDatagramChannel) k.attachment(), false);
}
/**
* Read is called when a Selector has been notified that the underlying channel
* was something to be read. The channel would previously have registered its interest
@ -437,8 +433,7 @@ class NioDatagramWorker implements Runnable {
close(ch, succeededFuture(ch));
}
void write(final NioDatagramChannel channel,
final boolean mightNeedWakeup) {
void writeFromUserCode(final NioDatagramChannel channel) {
/*
* Note that we are not checking if the channel is connected. Connected
* has a different meaning in UDP and means that the channels socket is
@ -449,15 +444,33 @@ class NioDatagramWorker implements Runnable {
return;
}
if (mightNeedWakeup && scheduleWriteIfNecessary(channel)) {
if (scheduleWriteIfNecessary(channel)) {
return;
}
// From here, we are sure Thread.currentThread() == workerThread.
if (channel.writeSuspended) {
return;
}
if (channel.inWriteNowLoop) {
scheduleWriteIfNecessary(channel);
} else {
writeNow(channel, channel.getConfig().getWriteSpinCount());
return;
}
write0(channel);
}
void writeFromTaskLoop(final NioDatagramChannel ch) {
if (!ch.writeSuspended) {
write0(ch);
}
}
void writeFromSelectorLoop(final SelectionKey k) {
NioDatagramChannel ch = (NioDatagramChannel) k.attachment();
ch.writeSuspended = false;
write0(ch);
}
private boolean scheduleWriteIfNecessary(final NioDatagramChannel channel) {
@ -481,15 +494,15 @@ class NioDatagramWorker implements Runnable {
return false;
}
private void writeNow(final NioDatagramChannel channel,
final int writeSpinCount) {
private void write0(final NioDatagramChannel channel) {
boolean addOpWrite = false;
boolean removeOpWrite = false;
int writtenBytes = 0;
Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) {
// inform the channel that write is in-progress
channel.inWriteNowLoop = true;
@ -501,6 +514,7 @@ class NioDatagramWorker implements Runnable {
if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
@ -553,6 +567,7 @@ class NioDatagramWorker implements Runnable {
} else {
// Not written at all - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
break;
}
} catch (final AsynchronousCloseException e) {

View File

@ -386,7 +386,6 @@ class NioWorker implements Runnable {
}
if (channel.inWriteNowLoop) {
// Need to update the queue?
return;
}