Ported high-low watermark to xnio
This commit is contained in:
parent
4fa8e3adfc
commit
7a1963249d
@ -27,7 +27,9 @@ import static org.jboss.netty.channel.Channels.*;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.channels.GatheringByteChannel;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.AbstractChannel;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
@ -36,6 +38,7 @@ import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelSink;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.util.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.ThreadLocalBoolean;
|
||||
import org.jboss.xnio.IoUtils;
|
||||
import org.jboss.xnio.channels.BoundChannel;
|
||||
import org.jboss.xnio.channels.ConnectedChannel;
|
||||
@ -54,18 +57,12 @@ class BaseXnioChannel extends AbstractChannel implements XnioChannel {
|
||||
volatile java.nio.channels.Channel xnioChannel;
|
||||
|
||||
final Object writeLock = new Object();
|
||||
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
|
||||
final Queue<MessageEvent> writeBuffer = new WriteBuffer();
|
||||
final AtomicInteger writeBufferSize = new AtomicInteger();
|
||||
final AtomicInteger highWaterMarkCounter = new AtomicInteger();
|
||||
MessageEvent currentWriteEvent;
|
||||
int currentWriteIndex;
|
||||
|
||||
// TODO implement high / low water mark
|
||||
|
||||
/**
|
||||
* @param parent
|
||||
* @param factory
|
||||
* @param pipeline
|
||||
* @param sink
|
||||
*/
|
||||
BaseXnioChannel(
|
||||
Channel parent, ChannelFactory factory,
|
||||
ChannelPipeline pipeline, ChannelSink sink,
|
||||
@ -104,6 +101,45 @@ class BaseXnioChannel extends AbstractChannel implements XnioChannel {
|
||||
return getRemoteAddress() != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInterestOps() {
|
||||
if (!isOpen()) {
|
||||
return Channel.OP_WRITE;
|
||||
}
|
||||
|
||||
int interestOps = getRawInterestOps();
|
||||
int writeBufferSize = this.writeBufferSize.get();
|
||||
if (writeBufferSize != 0) {
|
||||
if (highWaterMarkCounter.get() > 0) {
|
||||
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
|
||||
if (writeBufferSize >= lowWaterMark) {
|
||||
interestOps |= Channel.OP_WRITE;
|
||||
} else {
|
||||
interestOps &= ~Channel.OP_WRITE;
|
||||
}
|
||||
} else {
|
||||
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
|
||||
if (writeBufferSize >= highWaterMark) {
|
||||
interestOps |= Channel.OP_WRITE;
|
||||
} else {
|
||||
interestOps &= ~Channel.OP_WRITE;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
interestOps &= ~Channel.OP_WRITE;
|
||||
}
|
||||
|
||||
return interestOps;
|
||||
}
|
||||
|
||||
int getRawInterestOps() {
|
||||
return super.getInterestOps();
|
||||
}
|
||||
|
||||
void setRawInterestOpsNow(int interestOps) {
|
||||
super.setInterestOpsNow(interestOps);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture write(Object message) {
|
||||
java.nio.channels.Channel xnioChannel = this.xnioChannel;
|
||||
@ -160,4 +196,57 @@ class BaseXnioChannel extends AbstractChannel implements XnioChannel {
|
||||
|
||||
fireChannelClosed(this);
|
||||
}
|
||||
|
||||
private final class WriteBuffer extends LinkedTransferQueue<MessageEvent> {
|
||||
|
||||
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
|
||||
|
||||
WriteBuffer() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean offer(MessageEvent e) {
|
||||
boolean success = super.offer(e);
|
||||
assert success;
|
||||
|
||||
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
|
||||
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
|
||||
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
|
||||
|
||||
if (newWriteBufferSize >= highWaterMark) {
|
||||
if (newWriteBufferSize - messageSize < highWaterMark) {
|
||||
highWaterMarkCounter.incrementAndGet();
|
||||
if (!notifying.get()) {
|
||||
notifying.set(Boolean.TRUE);
|
||||
fireChannelInterestChanged(BaseXnioChannel.this);
|
||||
notifying.set(Boolean.FALSE);
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageEvent poll() {
|
||||
MessageEvent e = super.poll();
|
||||
if (e != null) {
|
||||
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
|
||||
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
|
||||
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
|
||||
|
||||
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
|
||||
if (newWriteBufferSize + messageSize >= lowWaterMark) {
|
||||
highWaterMarkCounter.decrementAndGet();
|
||||
if (!notifying.get()) {
|
||||
notifying.set(Boolean.TRUE);
|
||||
fireChannelInterestChanged(BaseXnioChannel.this);
|
||||
notifying.set(Boolean.FALSE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -112,8 +112,10 @@ final class XnioClientChannelSink extends AbstractChannelSink {
|
||||
if (xnioChannel instanceof SuspendableReadChannel) {
|
||||
if ((interestOps & Channel.OP_READ) == 0) {
|
||||
((SuspendableReadChannel) xnioChannel).suspendReads();
|
||||
channel.setRawInterestOpsNow(Channel.OP_NONE);
|
||||
} else {
|
||||
((SuspendableReadChannel) xnioChannel).resumeReads();
|
||||
channel.setRawInterestOpsNow(Channel.OP_READ);
|
||||
}
|
||||
}
|
||||
e.getFuture().setSuccess();
|
||||
|
Loading…
Reference in New Issue
Block a user