From 7a1963249df455fb075c80b824e1a0120155660b Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Mon, 16 Mar 2009 05:00:29 +0000 Subject: [PATCH] Ported high-low watermark to xnio --- .../netty/channel/xnio/BaseXnioChannel.java | 107 ++++++++++++++++-- .../channel/xnio/XnioClientChannelSink.java | 8 +- 2 files changed, 103 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/xnio/BaseXnioChannel.java b/src/main/java/org/jboss/netty/channel/xnio/BaseXnioChannel.java index eae5c63ad1..a5695b5d19 100644 --- a/src/main/java/org/jboss/netty/channel/xnio/BaseXnioChannel.java +++ b/src/main/java/org/jboss/netty/channel/xnio/BaseXnioChannel.java @@ -27,7 +27,9 @@ import static org.jboss.netty.channel.Channels.*; import java.net.SocketAddress; import java.nio.channels.GatheringByteChannel; import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.AbstractChannel; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; @@ -36,6 +38,7 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.util.LinkedTransferQueue; +import org.jboss.netty.util.ThreadLocalBoolean; import org.jboss.xnio.IoUtils; import org.jboss.xnio.channels.BoundChannel; import org.jboss.xnio.channels.ConnectedChannel; @@ -54,18 +57,12 @@ class BaseXnioChannel extends AbstractChannel implements XnioChannel { volatile java.nio.channels.Channel xnioChannel; final Object writeLock = new Object(); - final Queue writeBuffer = new LinkedTransferQueue(); + final Queue writeBuffer = new WriteBuffer(); + final AtomicInteger writeBufferSize = new AtomicInteger(); + final AtomicInteger highWaterMarkCounter = new AtomicInteger(); MessageEvent currentWriteEvent; int currentWriteIndex; - // TODO implement high / low water mark - - /** - * @param parent - * @param factory - * @param pipeline - * @param sink - */ BaseXnioChannel( Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, @@ -104,6 +101,45 @@ class BaseXnioChannel extends AbstractChannel implements XnioChannel { return getRemoteAddress() != null; } + @Override + public int getInterestOps() { + if (!isOpen()) { + return Channel.OP_WRITE; + } + + int interestOps = getRawInterestOps(); + int writeBufferSize = this.writeBufferSize.get(); + if (writeBufferSize != 0) { + if (highWaterMarkCounter.get() > 0) { + int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); + if (writeBufferSize >= lowWaterMark) { + interestOps |= Channel.OP_WRITE; + } else { + interestOps &= ~Channel.OP_WRITE; + } + } else { + int highWaterMark = getConfig().getWriteBufferHighWaterMark(); + if (writeBufferSize >= highWaterMark) { + interestOps |= Channel.OP_WRITE; + } else { + interestOps &= ~Channel.OP_WRITE; + } + } + } else { + interestOps &= ~Channel.OP_WRITE; + } + + return interestOps; + } + + int getRawInterestOps() { + return super.getInterestOps(); + } + + void setRawInterestOpsNow(int interestOps) { + super.setInterestOpsNow(interestOps); + } + @Override public ChannelFuture write(Object message) { java.nio.channels.Channel xnioChannel = this.xnioChannel; @@ -160,4 +196,57 @@ class BaseXnioChannel extends AbstractChannel implements XnioChannel { fireChannelClosed(this); } + + private final class WriteBuffer extends LinkedTransferQueue { + + private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); + + WriteBuffer() { + super(); + } + + @Override + public boolean offer(MessageEvent e) { + boolean success = super.offer(e); + assert success; + + int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes(); + int newWriteBufferSize = writeBufferSize.addAndGet(messageSize); + int highWaterMark = getConfig().getWriteBufferHighWaterMark(); + + if (newWriteBufferSize >= highWaterMark) { + if (newWriteBufferSize - messageSize < highWaterMark) { + highWaterMarkCounter.incrementAndGet(); + if (!notifying.get()) { + notifying.set(Boolean.TRUE); + fireChannelInterestChanged(BaseXnioChannel.this); + notifying.set(Boolean.FALSE); + } + } + } + return true; + } + + @Override + public MessageEvent poll() { + MessageEvent e = super.poll(); + if (e != null) { + int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes(); + int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize); + int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); + + if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { + if (newWriteBufferSize + messageSize >= lowWaterMark) { + highWaterMarkCounter.decrementAndGet(); + if (!notifying.get()) { + notifying.set(Boolean.TRUE); + fireChannelInterestChanged(BaseXnioChannel.this); + notifying.set(Boolean.FALSE); + } + } + } + } + return e; + } + } } diff --git a/src/main/java/org/jboss/netty/channel/xnio/XnioClientChannelSink.java b/src/main/java/org/jboss/netty/channel/xnio/XnioClientChannelSink.java index 33804cb478..d45ae340d0 100644 --- a/src/main/java/org/jboss/netty/channel/xnio/XnioClientChannelSink.java +++ b/src/main/java/org/jboss/netty/channel/xnio/XnioClientChannelSink.java @@ -112,8 +112,10 @@ final class XnioClientChannelSink extends AbstractChannelSink { if (xnioChannel instanceof SuspendableReadChannel) { if ((interestOps & Channel.OP_READ) == 0) { ((SuspendableReadChannel) xnioChannel).suspendReads(); + channel.setRawInterestOpsNow(Channel.OP_NONE); } else { ((SuspendableReadChannel) xnioChannel).resumeReads(); + channel.setRawInterestOpsNow(Channel.OP_READ); } } e.getFuture().setSuccess(); @@ -138,13 +140,13 @@ final class XnioClientChannelSink extends AbstractChannelSink { @SuppressWarnings("unchecked") private static final class FutureConnectionNotifier implements Notifier { - + private final XnioClientChannel cc; - + FutureConnectionNotifier(XnioClientChannel cc) { this.cc = cc; } - + public void notify(IoFuture future, Object attachment) { ChannelFuture cf = (ChannelFuture) attachment; try {