Initial implementation of high/low water mark based write overflow prevention mechanism

This commit is contained in:
Trustin Lee 2008-11-28 03:19:31 +00:00
parent 18ac6f925f
commit 893cab5ce8
4 changed files with 143 additions and 120 deletions

View File

@ -25,6 +25,8 @@ package org.jboss.netty.channel.socket.nio;
import java.net.Socket;
import org.jboss.netty.channel.socket.DefaultSocketChannelConfig;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ConversionUtil;
/**
@ -39,10 +41,14 @@ import org.jboss.netty.util.ConversionUtil;
class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
implements NioSocketChannelConfig {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultNioSocketChannelConfig.class);
private volatile int writeBufferHighWaterMark = 256 * 1024;
private volatile int writeBufferLowWaterMark = 64 * 1024;
private volatile ReceiveBufferSizePredictor predictor =
new DefaultReceiveBufferSizePredictor();
private volatile int writeSpinCount = 16;
private volatile boolean readWriteFair;
DefaultNioSocketChannelConfig(Socket socket) {
super(socket);
@ -55,7 +61,13 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
}
if (key.equals("readWriteFair")) {
setReadWriteFair(ConversionUtil.toBoolean(value));
setReadWriteFair(true); // Deprecated
} else if (key.equals("writeBufferHighWaterMark")) {
// FIXME: low -> high
setWriteBufferHighWaterMark(ConversionUtil.toInt(value));
} else if (key.equals("writeBufferLowWaterMark")) {
// FIXME: high -> low
setWriteBufferLowWaterMark(ConversionUtil.toInt(value));
} else if (key.equals("writeSpinCount")) {
setWriteSpinCount(ConversionUtil.toInt(value));
} else if (key.equals("receiveBufferSizePredictor")) {
@ -66,6 +78,30 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
return true;
}
public int getWriteBufferHighWaterMark() {
return writeBufferHighWaterMark;
}
public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
if (writeBufferHighWaterMark < 0) {
throw new IllegalArgumentException(
"writeBufferHighWaterMark: " + writeBufferHighWaterMark);
}
this.writeBufferHighWaterMark = writeBufferHighWaterMark;
}
public int getWriteBufferLowWaterMark() {
return writeBufferLowWaterMark;
}
public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
if (writeBufferLowWaterMark < 0) {
throw new IllegalArgumentException(
"writeBufferLowWaterMark: " + writeBufferLowWaterMark);
}
this.writeBufferLowWaterMark = writeBufferLowWaterMark;
}
public int getWriteSpinCount() {
return writeSpinCount;
}
@ -91,10 +127,13 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
}
public boolean isReadWriteFair() {
return readWriteFair;
logger.warn(
"Detected an access to a deprecated configuration parameter: " +
"readWriteFair");
return true;
}
public void setReadWriteFair(boolean readWriteFair) {
this.readWriteFair = readWriteFair;
isReadWriteFair();
}
}

View File

@ -27,7 +27,9 @@ import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
@ -55,7 +57,32 @@ abstract class NioSocketChannel extends AbstractChannel
final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
final Runnable writeTask = new WriteTask();
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
final AtomicInteger writeBufferSize = new AtomicInteger();
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>() {
@Override
public boolean offer(MessageEvent e) {
boolean success = super.offer(e);
assert success;
writeBufferSize.addAndGet(
((ChannelBuffer) e.getMessage()).readableBytes());
return true;
}
@Override
public MessageEvent poll() {
MessageEvent e = super.poll();
if (e != null) {
int newWriteBufferSize = writeBufferSize.addAndGet(
-((ChannelBuffer) e.getMessage()).readableBytes());
if (newWriteBufferSize <= getConfig().getWriteBufferLowWaterMark()) {
mightNeedToNotifyUnwritability = true;
}
}
return e;
}
};
boolean wasWritable;
boolean mightNeedToNotifyUnwritability;
MessageEvent currentWriteEvent;
int currentWriteIndex;
@ -93,13 +120,31 @@ abstract class NioSocketChannel extends AbstractChannel
}
@Override
protected boolean setClosed() {
return super.setClosed();
public int getInterestOps() {
if (!isOpen()) {
return Channel.OP_WRITE;
}
int interestOps = getRawInterestOps();
if (writeBufferSize.get() >= getConfig().getWriteBufferHighWaterMark()) {
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
return interestOps;
}
int getRawInterestOps() {
return super.getInterestOps();
}
void setRawInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
}
@Override
protected void setInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
protected boolean setClosed() {
return super.setClosed();
}
@Override

View File

@ -59,6 +59,12 @@ import org.jboss.netty.channel.socket.SocketChannelConfig;
*/
public interface NioSocketChannelConfig extends SocketChannelConfig {
int getWriteBufferHighWaterMark();
void setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
int getWriteBufferLowWaterMark();
void setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
/**
* Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
@ -96,6 +102,9 @@ public interface NioSocketChannelConfig extends SocketChannelConfig {
void setReceiveBufferSizePredictor(ReceiveBufferSizePredictor predictor);
/**
* @deprecated This property has been replaced by the
* {@code writeBufferHighWaterMark} and {@code writeBufferLowWaterMark}.
*
* Returns {@code true} if and only if an I/O thread should do its effort
* to balance the ratio of read and write operations. Assuring
* the read-write fairness is sometimes necessary in a high speed network
@ -103,9 +112,13 @@ public interface NioSocketChannelConfig extends SocketChannelConfig {
* large number of write requests not giving enough time for other channels
* to perform I/O. The default value is {@code false}.
*/
@Deprecated
boolean isReadWriteFair();
/**
* @deprecated This property has been replaced by the
* {@code writeBufferHighWaterMark} and {@code writeBufferLowWaterMark}.
*
* Sets if an I/O thread should balance the ratio of read and write
* operations. Assuring the read-write fairness is sometimes necessary
* in a high speed network because a certain channel can spend too much
@ -113,5 +126,6 @@ public interface NioSocketChannelConfig extends SocketChannelConfig {
* time for other channels to perform I/O. The default value is
* {@code false}.
*/
@Deprecated
void setReadWriteFair(boolean fair);
}

View File

@ -382,12 +382,6 @@ class NioWorker implements Runnable {
if (worker != null) {
Thread workerThread = worker.thread;
if (workerThread != null && Thread.currentThread() != workerThread) {
if (!channel.isWritable()) {
// Will be written by the worker thread
// when the channel is ready to write.
return;
}
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
worker.writeTaskQueue.offer(channel.writeTask);
}
@ -402,24 +396,10 @@ class NioWorker implements Runnable {
}
}
final NioSocketChannelConfig cfg = channel.getConfig();
final int writeSpinCount = cfg.getWriteSpinCount();
final int maxWrittenBytes;
if (cfg.isReadWriteFair()) {
// Set limitation for the number of written bytes for read-write
// fairness. I used maxReadBufferSize * 3 / 2, which yields best
// performance in my experience while not breaking fairness much.
int previousReceiveBufferSize =
cfg.getReceiveBufferSizePredictor().nextReceiveBufferSize();
maxWrittenBytes = previousReceiveBufferSize + previousReceiveBufferSize >>> 1;
writeFair(channel, mightNeedWakeup, writeSpinCount, maxWrittenBytes);
} else {
writeUnfair(channel, mightNeedWakeup, writeSpinCount);
}
writeNow(channel, mightNeedWakeup, channel.getConfig().getWriteSpinCount());
}
private static void writeUnfair(NioSocketChannel channel,
private static void writeNow(NioSocketChannel channel,
boolean mightNeedWakeup, final int writeSpinCount) {
boolean open = true;
@ -492,86 +472,31 @@ class NioWorker implements Runnable {
} else if (removeOpWrite) {
setOpWrite(channel, false, mightNeedWakeup);
}
fireChannelInterestChangedIfNecessary(channel);
}
}
private static void writeFair(NioSocketChannel channel,
boolean mightNeedWakeup, final int writeSpinCount,
final int maxWrittenBytes) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
MessageEvent evt;
ChannelBuffer buf;
int bufIdx;
int writtenBytes = 0;
synchronized (channel.writeLock) {
Queue<MessageEvent> writeBuffer = channel.writeBuffer;
evt = channel.currentWriteEvent;
for (;;) {
if (evt == null) {
evt = writeBuffer.poll();
if (evt == null) {
channel.currentWriteEvent = null;
removeOpWrite = true;
break;
}
buf = (ChannelBuffer) evt.getMessage();
bufIdx = buf.readerIndex();
} else {
buf = (ChannelBuffer) evt.getMessage();
bufIdx = channel.currentWriteIndex;
}
try {
for (int i = writeSpinCount; i > 0; i --) {
int localWrittenBytes = buf.getBytes(
bufIdx,
channel.socket,
Math.min(
maxWrittenBytes - writtenBytes,
buf.writerIndex() - bufIdx));
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
bufIdx += localWrittenBytes;
break;
}
}
if (bufIdx == buf.writerIndex()) {
// Successful write - proceed to the next message.
evt.getFuture().setSuccess();
evt = null;
} else {
// Not written fully - perhaps the kernel buffer is full.
channel.currentWriteEvent = evt;
channel.currentWriteIndex = bufIdx;
addOpWrite = true;
break;
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
evt.getFuture().setFailure(t);
evt = null;
fireExceptionCaught(channel, t);
if (t instanceof IOException) {
open = false;
close(channel, channel.getSucceededFuture());
}
private static void fireChannelInterestChangedIfNecessary(
NioSocketChannel channel) {
int interestOps = channel.getRawInterestOps();
boolean wasWritable = channel.wasWritable;
boolean writable = channel.wasWritable = channel.isWritable();
if (wasWritable) {
if (writable) {
if (channel.mightNeedToNotifyUnwritability) {
channel.mightNeedToNotifyUnwritability = false;
fireChannelInterestChanged(channel, interestOps | Channel.OP_WRITE);
fireChannelInterestChanged(channel, interestOps & ~Channel.OP_WRITE);
}
} else {
fireChannelInterestChanged(channel, interestOps | Channel.OP_WRITE);
}
}
if (open) {
if (addOpWrite) {
setOpWrite(channel, true, mightNeedWakeup);
} else if (removeOpWrite) {
setOpWrite(channel, false, mightNeedWakeup);
} else {
if (writable) {
fireChannelInterestChanged(channel, interestOps & ~Channel.OP_WRITE);
} else {
fireChannelInterestChanged(channel, interestOps | Channel.OP_WRITE);
}
}
}
@ -603,7 +528,7 @@ class NioWorker implements Runnable {
synchronized (channel.interestOpsLock) {
if (opWrite) {
if (!mightNeedWakeup) {
interestOps = channel.getInterestOps();
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
@ -612,7 +537,7 @@ class NioWorker implements Runnable {
} else {
switch (CONSTRAINT_LEVEL) {
case 0:
interestOps = channel.getInterestOps();
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
@ -625,7 +550,7 @@ class NioWorker implements Runnable {
break;
case 1:
case 2:
interestOps = channel.getInterestOps();
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
if (Thread.currentThread() == worker.thread) {
interestOps |= SelectionKey.OP_WRITE;
@ -652,7 +577,7 @@ class NioWorker implements Runnable {
}
} else {
if (!mightNeedWakeup) {
interestOps = channel.getInterestOps();
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
@ -661,7 +586,7 @@ class NioWorker implements Runnable {
} else {
switch (CONSTRAINT_LEVEL) {
case 0:
interestOps = channel.getInterestOps();
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
@ -674,7 +599,7 @@ class NioWorker implements Runnable {
break;
case 1:
case 2:
interestOps = channel.getInterestOps();
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
if (Thread.currentThread() == worker.thread) {
interestOps &= ~SelectionKey.OP_WRITE;
@ -703,8 +628,8 @@ class NioWorker implements Runnable {
}
if (changed) {
channel.setInterestOpsNow(interestOps);
fireChannelInterestChanged(channel, interestOps);
channel.setRawInterestOpsNow(interestOps);
//fireChannelInterestChanged(channel, interestOps);
}
}
@ -725,8 +650,8 @@ class NioWorker implements Runnable {
if (channel.setClosed()) {
future.setSuccess();
if (connected) {
if (channel.getInterestOps() != Channel.OP_WRITE) {
channel.setInterestOpsNow(Channel.OP_WRITE);
if (channel.getRawInterestOps() != Channel.OP_WRITE) {
channel.setRawInterestOpsNow(Channel.OP_WRITE);
fireChannelInterestChanged(channel, Channel.OP_WRITE);
}
fireChannelDisconnected(channel);
@ -805,11 +730,11 @@ class NioWorker implements Runnable {
synchronized (channel.interestOpsLock) {
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
switch (CONSTRAINT_LEVEL) {
case 0:
if (channel.getInterestOps() != interestOps) {
if (channel.getRawInterestOps() != interestOps) {
key.interestOps(interestOps);
if (Thread.currentThread() != worker.thread &&
worker.wakenUp.compareAndSet(false, true)) {
@ -820,7 +745,7 @@ class NioWorker implements Runnable {
break;
case 1:
case 2:
if (channel.getInterestOps() != interestOps) {
if (channel.getRawInterestOps() != interestOps) {
if (Thread.currentThread() == worker.thread) {
key.interestOps(interestOps);
changed = true;
@ -845,7 +770,7 @@ class NioWorker implements Runnable {
future.setSuccess();
if (changed) {
channel.setInterestOpsNow(interestOps);
channel.setRawInterestOpsNow(interestOps);
fireChannelInterestChanged(channel, interestOps);
}
} catch (Throwable t) {