Rewrite ChannelOutboundBuffer
- Merge MessageList into ChannelOutboundBuffer - Make ChannelOutboundBuffer a queue-like data structure so that it is nearly impossible to leak a message - Make ChannelOutboundBuffer public so that AbstractChannel can expose it to its subclasses. - TODO: Re-enable gathering write in NioSocketChannel
This commit is contained in:
parent
fe59ab1b9c
commit
bcef796dc7
@ -292,10 +292,14 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWriteMessages(Object[] msgs, int msgLength, int startIndex, boolean lastSpin) throws Exception {
|
||||
SctpMessage packet = (SctpMessage) msgs[startIndex];
|
||||
protected boolean doWriteMessage(Object msg) throws Exception {
|
||||
SctpMessage packet = (SctpMessage) msg;
|
||||
ByteBuf data = packet.content();
|
||||
int dataLen = data.readableBytes();
|
||||
if (dataLen == 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
ByteBuffer nioData;
|
||||
if (data.nioBufferCount() == 1) {
|
||||
nioData = data.nioBuffer();
|
||||
@ -311,32 +315,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
|
||||
|
||||
final int writtenBytes = javaChannel().send(nioData, mi);
|
||||
|
||||
final SelectionKey key = selectionKey();
|
||||
final int interestOps = key.interestOps();
|
||||
if (writtenBytes <= 0 && dataLen > 0) {
|
||||
// Did not write a packet.
|
||||
// 1) If 'lastSpin' is false, the caller will call this method again real soon.
|
||||
// - Do not update OP_WRITE.
|
||||
// 2) If 'lastSpin' is true, the caller will not retry.
|
||||
// - Set OP_WRITE so that the event loop calls flushForcibly() later.
|
||||
if (lastSpin) {
|
||||
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
|
||||
key.interestOps(interestOps | SelectionKey.OP_WRITE);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// packet was written free up buffer
|
||||
packet.release();
|
||||
|
||||
if (msgLength == 1) {
|
||||
// Wrote the outbound buffer completely - clear OP_WRITE.
|
||||
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
|
||||
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
return writtenBytes > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -216,7 +216,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWriteMessages(Object[] msgs, int msgLength, int startIndex, boolean lastSpin) throws Exception {
|
||||
protected boolean doWriteMessage(Object msg) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.oio.AbstractOioMessageChannel;
|
||||
@ -219,59 +220,57 @@ public class OioSctpChannel extends AbstractOioMessageChannel
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWrite(Object[] msgs, int length, int startIndex) throws Exception {
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
if (!writeSelector.isOpen()) {
|
||||
return 0;
|
||||
return;
|
||||
}
|
||||
final int size = in.size();
|
||||
final int selectedKeys = writeSelector.select(SO_TIMEOUT);
|
||||
if (selectedKeys > 0) {
|
||||
final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
|
||||
if (writableKeys.isEmpty()) {
|
||||
return 0;
|
||||
return;
|
||||
}
|
||||
Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
|
||||
int written = 0;
|
||||
for (;;) {
|
||||
if (written == length) {
|
||||
if (written == size) {
|
||||
// all written
|
||||
return written;
|
||||
return;
|
||||
}
|
||||
writableKeysIt.next();
|
||||
writableKeysIt.remove();
|
||||
|
||||
SctpMessage packet = (SctpMessage) msgs[startIndex ++];
|
||||
SctpMessage packet = (SctpMessage) in.current();
|
||||
if (packet == null) {
|
||||
return written;
|
||||
return;
|
||||
}
|
||||
try {
|
||||
ByteBuf data = packet.content();
|
||||
int dataLen = data.readableBytes();
|
||||
ByteBuffer nioData;
|
||||
|
||||
if (data.nioBufferCount() != -1) {
|
||||
nioData = data.nioBuffer();
|
||||
} else {
|
||||
nioData = ByteBuffer.allocate(dataLen);
|
||||
data.getBytes(data.readerIndex(), nioData);
|
||||
nioData.flip();
|
||||
}
|
||||
ByteBuf data = packet.content();
|
||||
int dataLen = data.readableBytes();
|
||||
ByteBuffer nioData;
|
||||
|
||||
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
|
||||
mi.payloadProtocolID(packet.protocolIdentifier());
|
||||
mi.streamNumber(packet.streamIdentifier());
|
||||
|
||||
ch.send(nioData, mi);
|
||||
written ++;
|
||||
} finally {
|
||||
packet.release();
|
||||
if (data.nioBufferCount() != -1) {
|
||||
nioData = data.nioBuffer();
|
||||
} else {
|
||||
nioData = ByteBuffer.allocate(dataLen);
|
||||
data.getBytes(data.readerIndex(), nioData);
|
||||
nioData.flip();
|
||||
}
|
||||
|
||||
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
|
||||
mi.payloadProtocolID(packet.protocolIdentifier());
|
||||
mi.streamNumber(packet.streamIdentifier());
|
||||
|
||||
ch.send(nioData, mi);
|
||||
written ++;
|
||||
in.remove();
|
||||
|
||||
if (!writableKeysIt.hasNext()) {
|
||||
return written;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -20,6 +20,7 @@ import com.sun.nio.sctp.SctpServerChannel;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.oio.AbstractOioMessageChannel;
|
||||
import io.netty.channel.sctp.DefaultSctpServerChannelConfig;
|
||||
@ -285,7 +286,7 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception {
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel im
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWriteMessages(Object[] msgs, int msgLength, int startIndex, boolean lastSpin) throws Exception {
|
||||
protected boolean doWriteMessage(Object msg) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
@ -140,15 +140,14 @@ public class NioUdtByteConnectorChannel extends AbstractNioByteChannel implement
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWriteBytes(final ByteBuf byteBuf, final boolean lastSpin) throws Exception {
|
||||
protected int doWriteBytes(final ByteBuf byteBuf) throws Exception {
|
||||
final int expectedWrittenBytes = byteBuf.readableBytes();
|
||||
final int writtenBytes = byteBuf.readBytes(javaChannel(), expectedWrittenBytes);
|
||||
updateOpWrite(expectedWrittenBytes, writtenBytes, lastSpin);
|
||||
return writtenBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long doWriteFileRegion(FileRegion region, boolean lastSpin) throws Exception {
|
||||
protected long doWriteFileRegion(FileRegion region) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
@ -31,7 +31,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.util.List;
|
||||
|
||||
import static java.nio.channels.SelectionKey.*;
|
||||
@ -167,9 +166,9 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWriteMessages(Object[] msgs, int msgLength, int startIndex, boolean lastSpin) throws Exception {
|
||||
protected boolean doWriteMessage(Object msg) throws Exception {
|
||||
// expects a message
|
||||
final UdtMessage message = (UdtMessage) msgs[startIndex];
|
||||
final UdtMessage message = (UdtMessage) msg;
|
||||
|
||||
final ByteBuf byteBuf = message.content();
|
||||
|
||||
@ -182,17 +181,9 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp
|
||||
writtenBytes = javaChannel().write(byteBuf.nioBuffers());
|
||||
}
|
||||
|
||||
final SelectionKey key = selectionKey();
|
||||
final int interestOps = key.interestOps();
|
||||
|
||||
// did not write the message
|
||||
if (writtenBytes <= 0 && messageSize > 0) {
|
||||
if (lastSpin) {
|
||||
if ((interestOps & OP_WRITE) == 0) {
|
||||
key.interestOps(interestOps | OP_WRITE);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
// wrote message completely
|
||||
@ -201,16 +192,7 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp
|
||||
"Provider error: failed to write message. Provider library should be upgraded.");
|
||||
}
|
||||
|
||||
// wrote the message queue completely - clear OP_WRITE.
|
||||
if (msgLength == 1) {
|
||||
if ((interestOps & OP_WRITE) != 0) {
|
||||
key.interestOps(interestOps & ~OP_WRITE);
|
||||
}
|
||||
}
|
||||
|
||||
message.release();
|
||||
|
||||
return 1;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -516,12 +516,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
promise.setFailure(t);
|
||||
}
|
||||
|
||||
if (!outboundBuffer.isEmpty()) {
|
||||
// fail all queued messages
|
||||
outboundBuffer.fail(CLOSED_CHANNEL_EXCEPTION);
|
||||
}
|
||||
|
||||
outboundBuffer.clearUnflushed(CLOSED_CHANNEL_EXCEPTION);
|
||||
// fail all queued messages
|
||||
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
||||
outboundBuffer.failUnflushed(CLOSED_CHANNEL_EXCEPTION);
|
||||
|
||||
if (wasActive && !isActive()) {
|
||||
invokeLater(new Runnable() {
|
||||
@ -628,59 +625,28 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
return;
|
||||
}
|
||||
|
||||
inFlush0 = true;
|
||||
|
||||
final ChannelOutboundBuffer outboundBuffer = AbstractChannel.this.outboundBuffer;
|
||||
if (outboundBuffer.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
inFlush0 = true;
|
||||
|
||||
// Mark all pending write requests as failure if the channel is inactive.
|
||||
if (!isActive()) {
|
||||
if (isOpen()) {
|
||||
outboundBuffer.fail(NOT_YET_CONNECTED_EXCEPTION);
|
||||
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
|
||||
} else {
|
||||
outboundBuffer.fail(CLOSED_CHANNEL_EXCEPTION);
|
||||
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
||||
}
|
||||
inFlush0 = false;
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
for (;;) {
|
||||
MessageList messages = outboundBuffer.currentMessageList;
|
||||
if (messages == null) {
|
||||
if (!outboundBuffer.next()) {
|
||||
break;
|
||||
}
|
||||
messages = outboundBuffer.currentMessageList;
|
||||
}
|
||||
|
||||
int messageIndex = outboundBuffer.currentMessageIndex;
|
||||
int messageCount = messages.size();
|
||||
Object[] messageArray = messages.messages();
|
||||
ChannelPromise[] promiseArray = messages.promises();
|
||||
|
||||
// Write the messages.
|
||||
final int writtenMessages = doWrite(messageArray, messageCount, messageIndex);
|
||||
|
||||
// Notify the promises.
|
||||
final int newMessageIndex = messageIndex + writtenMessages;
|
||||
for (int i = messageIndex; i < newMessageIndex; i ++) {
|
||||
promiseArray[i].trySuccess();
|
||||
}
|
||||
|
||||
// Update the index variable and decide what to do next.
|
||||
outboundBuffer.currentMessageIndex = messageIndex = newMessageIndex;
|
||||
if (messageIndex >= messageCount) {
|
||||
messages.recycle();
|
||||
if (!outboundBuffer.next()) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// Could not flush the current write request completely. Try again later.
|
||||
break;
|
||||
}
|
||||
}
|
||||
doWrite(outboundBuffer);
|
||||
} catch (Throwable t) {
|
||||
outboundBuffer.fail(t);
|
||||
outboundBuffer.failFlushed(t);
|
||||
if (t instanceof IOException) {
|
||||
close(voidPromise());
|
||||
}
|
||||
@ -790,13 +756,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
protected abstract void doBeginRead() throws Exception;
|
||||
|
||||
/**
|
||||
* Flush the content of the given {@link ByteBuf} to the remote peer.
|
||||
* Flush the content of the given buffer to the remote peer.
|
||||
*
|
||||
* Sub-classes may override this as this implementation will just thrown an {@link UnsupportedOperationException}
|
||||
*
|
||||
* @return the number of written messages
|
||||
*/
|
||||
protected abstract int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception;
|
||||
protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
|
||||
|
||||
protected static void checkEOF(FileRegion region) throws IOException {
|
||||
if (region.transfered() < region.count()) {
|
||||
|
@ -67,7 +67,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception {
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
@ -23,35 +23,38 @@ import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
|
||||
final class ChannelOutboundBuffer {
|
||||
public final class ChannelOutboundBuffer {
|
||||
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
|
||||
|
||||
private static final int MIN_INITIAL_CAPACITY = 8;
|
||||
|
||||
MessageList currentMessageList;
|
||||
int currentMessageIndex;
|
||||
private long currentMessageListSize;
|
||||
|
||||
private MessageList[] messageLists;
|
||||
private long[] messageListSizes;
|
||||
|
||||
private MessageList unflushedMessageList;
|
||||
private long unflushedMessageListSize;
|
||||
|
||||
private int head;
|
||||
private int tail;
|
||||
private boolean inFail;
|
||||
private final AbstractChannel channel;
|
||||
|
||||
// Flushed messages are stored in a circulas queue.
|
||||
private Object[] flushed;
|
||||
private ChannelPromise[] flushedPromises;
|
||||
private long[] flushedProgresses;
|
||||
private long[] flushedTotals;
|
||||
private int head;
|
||||
private int tail;
|
||||
|
||||
// Unflushed messages are stored in an array list.
|
||||
private Object[] unflushed;
|
||||
private ChannelPromise[] unflushedPromises;
|
||||
private long[] unflushedTotals;
|
||||
private int unflushedCount;
|
||||
|
||||
private boolean inFail;
|
||||
private long pendingOutboundBytes;
|
||||
|
||||
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> WRITABLE_UPDATER =
|
||||
AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "writable");
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@SuppressWarnings({ "unused", "FieldMayBeFinal" })
|
||||
private volatile int writable = 1;
|
||||
|
||||
ChannelOutboundBuffer(AbstractChannel channel) {
|
||||
@ -82,39 +85,126 @@ final class ChannelOutboundBuffer {
|
||||
|
||||
this.channel = channel;
|
||||
|
||||
messageLists = new MessageList[initialCapacity];
|
||||
messageListSizes = new long[initialCapacity];
|
||||
flushed = new Object[initialCapacity];
|
||||
flushedPromises = new ChannelPromise[initialCapacity];
|
||||
flushedProgresses = new long[initialCapacity];
|
||||
flushedTotals = new long[initialCapacity];
|
||||
|
||||
unflushed = new Object[initialCapacity];
|
||||
unflushedPromises = new ChannelPromise[initialCapacity];
|
||||
unflushedTotals = new long[initialCapacity];
|
||||
}
|
||||
|
||||
void addMessage(Object msg, ChannelPromise promise) {
|
||||
MessageList unflushedMessageList = this.unflushedMessageList;
|
||||
if (unflushedMessageList == null) {
|
||||
this.unflushedMessageList = unflushedMessageList = MessageList.newInstance();
|
||||
Object[] unflushed = this.unflushed;
|
||||
int unflushedCount = this.unflushedCount;
|
||||
if (unflushedCount == unflushed.length - 1) {
|
||||
doubleUnflushedCapacity();
|
||||
unflushed = this.unflushed;
|
||||
}
|
||||
|
||||
unflushedMessageList.add(msg, promise);
|
||||
|
||||
int size = channel.calculateMessageSize(msg);
|
||||
unflushedMessageListSize += size;
|
||||
final int size = channel.calculateMessageSize(msg);
|
||||
incrementPendingOutboundBytes(size);
|
||||
|
||||
unflushed[unflushedCount] = msg;
|
||||
unflushedPromises[unflushedCount] = promise;
|
||||
unflushedTotals[unflushedCount] = size;
|
||||
this.unflushedCount = unflushedCount + 1;
|
||||
}
|
||||
|
||||
private void doubleUnflushedCapacity() {
|
||||
int newCapacity = unflushed.length << 1;
|
||||
if (newCapacity < 0) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
int unflushedCount = this.unflushedCount;
|
||||
|
||||
Object[] a1 = new Object[newCapacity];
|
||||
System.arraycopy(unflushed, 0, a1, 0, unflushedCount);
|
||||
unflushed = a1;
|
||||
|
||||
ChannelPromise[] a2 = new ChannelPromise[newCapacity];
|
||||
System.arraycopy(unflushedPromises, 0, a2, 0, unflushedCount);
|
||||
unflushedPromises = a2;
|
||||
|
||||
long[] a3 = new long[newCapacity];
|
||||
System.arraycopy(unflushedTotals, 0, a3, 0, unflushedCount);
|
||||
unflushedTotals = a3;
|
||||
}
|
||||
|
||||
void addFlush() {
|
||||
MessageList unflushedMessageList = this.unflushedMessageList;
|
||||
if (unflushedMessageList == null) {
|
||||
final int unflushedCount = this.unflushedCount;
|
||||
if (unflushedCount == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
Object[] unflushed = this.unflushed;
|
||||
ChannelPromise[] unflushedPromises = this.unflushedPromises;
|
||||
long[] unflushedTotals = this.unflushedTotals;
|
||||
|
||||
Object[] flushed = this.flushed;
|
||||
ChannelPromise[] flushedPromises = this.flushedPromises;
|
||||
long[] flushedProgresses = this.flushedProgresses;
|
||||
long[] flushedTotals = this.flushedTotals;
|
||||
int head = this.head;
|
||||
int tail = this.tail;
|
||||
|
||||
messageLists[tail] = unflushedMessageList;
|
||||
messageListSizes[tail] = unflushedMessageListSize;
|
||||
this.unflushedMessageList = null;
|
||||
unflushedMessageListSize = 0;
|
||||
|
||||
if ((this.tail = (tail + 1) & (messageLists.length - 1)) == head) {
|
||||
doubleCapacity();
|
||||
for (int i = 0; i < unflushedCount; i ++) {
|
||||
flushed[tail] = unflushed[i];
|
||||
flushedPromises[tail] = unflushedPromises[i];
|
||||
flushedProgresses[tail] = 0;
|
||||
flushedTotals[tail] = unflushedTotals[i];
|
||||
if ((tail = (tail + 1) & (flushed.length - 1)) == head) {
|
||||
this.tail = tail;
|
||||
doubleFlushedCapacity();
|
||||
head = this.head;
|
||||
tail = this.tail;
|
||||
flushed = this.flushed;
|
||||
flushedPromises = this.flushedPromises;
|
||||
flushedProgresses = this.flushedProgresses;
|
||||
flushedTotals = this.flushedTotals;
|
||||
}
|
||||
}
|
||||
|
||||
Arrays.fill(unflushed, 0, unflushedCount, null);
|
||||
Arrays.fill(unflushedPromises, 0, unflushedCount, null);
|
||||
this.unflushedCount = 0;
|
||||
|
||||
this.tail = tail;
|
||||
}
|
||||
|
||||
private void doubleFlushedCapacity() {
|
||||
int p = head;
|
||||
int n = flushed.length;
|
||||
int r = n - p; // number of elements to the right of p
|
||||
int newCapacity = n << 1;
|
||||
if (newCapacity < 0) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
Object[] a1 = new Object[newCapacity];
|
||||
System.arraycopy(flushed, p, a1, 0, r);
|
||||
System.arraycopy(flushed, 0, a1, r, p);
|
||||
flushed = a1;
|
||||
|
||||
ChannelPromise[] a2 = new ChannelPromise[newCapacity];
|
||||
System.arraycopy(flushedPromises, p, a2, 0, r);
|
||||
System.arraycopy(flushedPromises, 0, a2, r, p);
|
||||
flushedPromises = a2;
|
||||
|
||||
long[] a3 = new long[newCapacity];
|
||||
System.arraycopy(flushedProgresses, p, a3, 0, r);
|
||||
System.arraycopy(flushedProgresses, 0, a3, r, p);
|
||||
flushedProgresses = a3;
|
||||
|
||||
long[] a4 = new long[newCapacity];
|
||||
System.arraycopy(flushedTotals, p, a4, 0, r);
|
||||
System.arraycopy(flushedTotals, 0, a4, r, p);
|
||||
flushedTotals = a4;
|
||||
|
||||
head = 0;
|
||||
tail = n;
|
||||
}
|
||||
|
||||
private void incrementPendingOutboundBytes(int size) {
|
||||
@ -148,54 +238,58 @@ final class ChannelOutboundBuffer {
|
||||
}
|
||||
}
|
||||
|
||||
private void doubleCapacity() {
|
||||
assert head == tail;
|
||||
|
||||
int p = head;
|
||||
int n = messageLists.length;
|
||||
int r = n - p; // number of elements to the right of p
|
||||
int newCapacity = n << 1;
|
||||
if (newCapacity < 0) {
|
||||
throw new IllegalStateException("Sorry, deque too big");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
MessageList[] a1 = new MessageList[newCapacity];
|
||||
System.arraycopy(messageLists, p, a1, 0, r);
|
||||
System.arraycopy(messageLists, 0, a1, r, p);
|
||||
messageLists = a1;
|
||||
|
||||
long[] a2 = new long[newCapacity];
|
||||
System.arraycopy(messageListSizes, p, a2, 0, r);
|
||||
System.arraycopy(messageListSizes, 0, a2, r, p);
|
||||
messageListSizes = a2;
|
||||
|
||||
head = 0;
|
||||
tail = n;
|
||||
public Object current() {
|
||||
return flushed[head];
|
||||
}
|
||||
|
||||
boolean next() {
|
||||
// FIXME: pendingOutboundBytes should be decreased when the messages are flushed.
|
||||
public void progress(long amount) {
|
||||
int head = this.head;
|
||||
ChannelPromise p = flushedPromises[head];
|
||||
if (p instanceof ChannelProgressivePromise) {
|
||||
long progress = flushedProgresses[head] + amount;
|
||||
flushedProgresses[head] = progress;
|
||||
((ChannelProgressivePromise) p).tryProgress(progress, flushedTotals[head]);
|
||||
}
|
||||
}
|
||||
|
||||
decrementPendingOutboundBytes(currentMessageListSize);
|
||||
public boolean remove() {
|
||||
int head = this.head;
|
||||
|
||||
int h = head;
|
||||
|
||||
MessageList e = messageLists[h]; // Element is null if deque empty
|
||||
if (e == null) {
|
||||
currentMessageListSize = 0;
|
||||
currentMessageList = null;
|
||||
Object msg = flushed[head];
|
||||
if (msg == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
currentMessageList = messageLists[h];
|
||||
currentMessageIndex = 0;
|
||||
currentMessageListSize = messageListSizes[h];
|
||||
safeRelease(msg);
|
||||
flushed[head] = null;
|
||||
|
||||
messageLists[h] = null;
|
||||
messageListSizes[h] = 0;
|
||||
ChannelPromise promise = flushedPromises[head];
|
||||
promise.trySuccess();
|
||||
flushedPromises[head] = null;
|
||||
|
||||
head = h + 1 & messageLists.length - 1;
|
||||
decrementPendingOutboundBytes(flushedTotals[head]);
|
||||
|
||||
this.head = head + 1 & flushed.length - 1;
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean remove(Throwable cause) {
|
||||
int head = this.head;
|
||||
|
||||
Object msg = flushed[head];
|
||||
if (msg == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
safeRelease(msg);
|
||||
flushed[head] = null;
|
||||
|
||||
safeFail(flushedPromises[head], cause);
|
||||
flushedPromises[head] = null;
|
||||
|
||||
decrementPendingOutboundBytes(flushedTotals[head]);
|
||||
|
||||
this.head = head + 1 & flushed.length - 1;
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -203,43 +297,38 @@ final class ChannelOutboundBuffer {
|
||||
return WRITABLE_UPDATER.get(this) == 1;
|
||||
}
|
||||
|
||||
int size() {
|
||||
return tail - head & messageLists.length - 1;
|
||||
public int size() {
|
||||
return tail - head & flushed.length - 1;
|
||||
}
|
||||
|
||||
boolean isEmpty() {
|
||||
public boolean isEmpty() {
|
||||
return head == tail;
|
||||
}
|
||||
|
||||
void clearUnflushed(Throwable cause) {
|
||||
void failUnflushed(Throwable cause) {
|
||||
if (inFail) {
|
||||
return;
|
||||
}
|
||||
|
||||
MessageList unflushed = unflushedMessageList;
|
||||
if (unflushed == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
inFail = true;
|
||||
|
||||
// Release all unflushed messages.
|
||||
Object[] messages = unflushed.messages();
|
||||
ChannelPromise[] promises = unflushed.promises();
|
||||
final int size = unflushed.size();
|
||||
Object[] unflushed = this.unflushed;
|
||||
ChannelPromise[] unflushedPromises = this.unflushedPromises;
|
||||
long[] unflushedTotals = this.unflushedTotals;
|
||||
final int unflushedCount = this.unflushedCount;
|
||||
try {
|
||||
for (int i = 0; i < size; i++) {
|
||||
safeRelease(messages[i], promises[i], cause);
|
||||
for (int i = 0; i < unflushedCount; i++) {
|
||||
safeRelease(unflushed[i]);
|
||||
safeFail(unflushedPromises[i], cause);
|
||||
decrementPendingOutboundBytes(unflushedTotals[i]);
|
||||
}
|
||||
} finally {
|
||||
unflushed.recycle();
|
||||
decrementPendingOutboundBytes(unflushedMessageListSize);
|
||||
unflushedMessageListSize = 0;
|
||||
inFail = false;
|
||||
}
|
||||
}
|
||||
|
||||
void fail(Throwable cause) {
|
||||
void failFlushed(Throwable cause) {
|
||||
// Make sure that this method does not reenter. A listener added to the current promise can be notified by the
|
||||
// current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
|
||||
// indirectly (usually by closing the channel.)
|
||||
@ -251,49 +340,27 @@ final class ChannelOutboundBuffer {
|
||||
|
||||
try {
|
||||
inFail = true;
|
||||
if (currentMessageList == null) {
|
||||
if (!next()) {
|
||||
return;
|
||||
for (;;) {
|
||||
if (!remove(cause)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
do {
|
||||
if (currentMessageList != null) {
|
||||
// Store a local reference of current messages
|
||||
// This is needed as a promise may have a listener attached that will close the channel
|
||||
// The close will call next() which will set currentMessages to null and so
|
||||
// trigger a NPE in the finally block if no local reference is used.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/1573
|
||||
MessageList current = currentMessageList;
|
||||
// Release all failed messages.
|
||||
Object[] messages = current.messages();
|
||||
ChannelPromise[] promises = current.promises();
|
||||
final int size = current.size();
|
||||
try {
|
||||
for (int i = currentMessageIndex; i < size; i++) {
|
||||
safeRelease(messages[i], promises[i], cause);
|
||||
}
|
||||
} finally {
|
||||
current.recycle();
|
||||
}
|
||||
}
|
||||
} while(next());
|
||||
} finally {
|
||||
inFail = false;
|
||||
}
|
||||
}
|
||||
|
||||
private static void safeRelease(Object message, ChannelPromise promise, Throwable cause) {
|
||||
private static void safeRelease(Object message) {
|
||||
try {
|
||||
ReferenceCountUtil.release(message);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("Failed to release a message.", t);
|
||||
}
|
||||
}
|
||||
|
||||
ChannelPromise p = promise;
|
||||
if (!(p instanceof VoidChannelPromise) && !p.tryFailure(cause)) {
|
||||
logger.warn("Promise done already: {} - new exception is:", p, cause);
|
||||
private static void safeFail(ChannelPromise promise, Throwable cause) {
|
||||
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
|
||||
logger.warn("Promise done already: {} - new exception is:", promise, cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -346,11 +346,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
throw new NullPointerException("msg");
|
||||
}
|
||||
|
||||
// FIXME: Remove once refactoring is done.
|
||||
if (msg instanceof MessageList) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
final DefaultChannelHandlerContext next = findContextInbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
|
@ -1,145 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.util.Recycler;
|
||||
import io.netty.util.Recycler.Handle;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* A simple array-backed list that holds one or more messages.
|
||||
*/
|
||||
final class MessageList {
|
||||
|
||||
private static final int DEFAULT_INITIAL_CAPACITY = 8;
|
||||
private static final int MIN_INITIAL_CAPACITY = 4;
|
||||
|
||||
private static final Recycler<MessageList> RECYCLER = new Recycler<MessageList>() {
|
||||
@Override
|
||||
protected MessageList newObject(Handle handle) {
|
||||
return new MessageList(handle);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Create a new empty {@link MessageList} instance.
|
||||
*/
|
||||
static MessageList newInstance() {
|
||||
MessageList ret = RECYCLER.get();
|
||||
return ret;
|
||||
}
|
||||
|
||||
private final Handle handle;
|
||||
private Object[] messages;
|
||||
private ChannelPromise[] promises;
|
||||
private int size;
|
||||
|
||||
MessageList(Handle handle) {
|
||||
this(handle, DEFAULT_INITIAL_CAPACITY);
|
||||
}
|
||||
|
||||
MessageList(Handle handle, int initialCapacity) {
|
||||
this.handle = handle;
|
||||
initialCapacity = normalizeCapacity(initialCapacity);
|
||||
messages = new Object[initialCapacity];
|
||||
promises = new ChannelPromise[initialCapacity];
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the current size of this {@link MessageList} and so how many messages it holds.
|
||||
*/
|
||||
int size() {
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return {@code true} if this {@link MessageList} is empty and so contains no messages.
|
||||
*/
|
||||
boolean isEmpty() {
|
||||
return size == 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the message to this {@link MessageList} and return itself.
|
||||
*/
|
||||
MessageList add(Object message, ChannelPromise promise) {
|
||||
int oldSize = size;
|
||||
int newSize = oldSize + 1;
|
||||
ensureCapacity(newSize);
|
||||
messages[oldSize] = message;
|
||||
promises[oldSize] = promise;
|
||||
size = newSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the backing array of this list.
|
||||
*/
|
||||
Object[] messages() {
|
||||
return messages;
|
||||
}
|
||||
|
||||
ChannelPromise[] promises() {
|
||||
return promises;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear and recycle this instance.
|
||||
*/
|
||||
boolean recycle() {
|
||||
Arrays.fill(messages, 0, size, null);
|
||||
Arrays.fill(promises, 0, size, null);
|
||||
size = 0;
|
||||
return RECYCLER.recycle(this, handle);
|
||||
}
|
||||
|
||||
private void ensureCapacity(int capacity) {
|
||||
if (messages.length >= capacity) {
|
||||
return;
|
||||
}
|
||||
|
||||
final int size = this.size;
|
||||
capacity = normalizeCapacity(capacity);
|
||||
|
||||
Object[] newMessages = new Object[capacity];
|
||||
System.arraycopy(messages, 0, newMessages, 0, size);
|
||||
messages = newMessages;
|
||||
|
||||
ChannelPromise[] newPromises = new ChannelPromise[capacity];
|
||||
System.arraycopy(promises, 0, newPromises, 0, size);
|
||||
promises = newPromises;
|
||||
}
|
||||
|
||||
private static int normalizeCapacity(int initialCapacity) {
|
||||
if (initialCapacity <= MIN_INITIAL_CAPACITY) {
|
||||
initialCapacity = MIN_INITIAL_CAPACITY;
|
||||
} else {
|
||||
initialCapacity |= initialCapacity >>> 1;
|
||||
initialCapacity |= initialCapacity >>> 2;
|
||||
initialCapacity |= initialCapacity >>> 4;
|
||||
initialCapacity |= initialCapacity >>> 8;
|
||||
initialCapacity |= initialCapacity >>> 16;
|
||||
initialCapacity ++;
|
||||
|
||||
if (initialCapacity < 0) {
|
||||
initialCapacity >>>= 1;
|
||||
}
|
||||
}
|
||||
return initialCapacity;
|
||||
}
|
||||
}
|
@ -23,10 +23,12 @@ import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.RecyclableArrayList;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
@ -305,11 +307,17 @@ public class EmbeddedChannel extends AbstractChannel {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception {
|
||||
for (int i = startIndex; i < msgsLength; i ++) {
|
||||
lastOutboundBuffer.add(msgs[i]);
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
for (;;) {
|
||||
Object msg = in.current();
|
||||
if (msg == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
ReferenceCountUtil.retain(msg);
|
||||
lastOutboundBuffer.add(msg);
|
||||
in.remove();
|
||||
}
|
||||
return msgsLength - startIndex;
|
||||
}
|
||||
|
||||
private class DefaultUnsafe extends AbstractUnsafe {
|
||||
|
@ -20,11 +20,13 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.SingleThreadEventLoop;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.concurrent.SingleThreadEventExecutor;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
@ -265,7 +267,7 @@ public class LocalChannel extends AbstractChannel {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception {
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
if (state < 2) {
|
||||
throw new NotYetConnectedException();
|
||||
}
|
||||
@ -278,14 +280,23 @@ public class LocalChannel extends AbstractChannel {
|
||||
final EventLoop peerLoop = peer.eventLoop();
|
||||
|
||||
if (peerLoop == eventLoop()) {
|
||||
for (int i = startIndex; i < msgsLength; i ++) {
|
||||
peer.inboundBuffer.add(msgs[i]);
|
||||
for (;;) {
|
||||
Object msg = in.current();
|
||||
if (msg == null) {
|
||||
break;
|
||||
}
|
||||
peer.inboundBuffer.add(msg);
|
||||
ReferenceCountUtil.retain(msg);
|
||||
in.remove();
|
||||
}
|
||||
finishPeerRead(peer, peerPipeline);
|
||||
} else {
|
||||
// Use a copy because the original msgs will be recycled by AbstractChannel.
|
||||
final Object[] msgsCopy = new Object[msgsLength - startIndex];
|
||||
System.arraycopy(msgs, startIndex, msgsCopy, 0, msgsCopy.length);
|
||||
final Object[] msgsCopy = new Object[in.size()];
|
||||
for (int i = 0; i < msgsCopy.length; i ++) {
|
||||
msgsCopy[i] = ReferenceCountUtil.retain(in.current());
|
||||
in.remove();
|
||||
}
|
||||
|
||||
peerLoop.execute(new Runnable() {
|
||||
@Override
|
||||
@ -297,8 +308,6 @@ public class LocalChannel extends AbstractChannel {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return msgsLength - startIndex;
|
||||
}
|
||||
|
||||
private static void finishPeerRead(LocalChannel peer, ChannelPipeline peerPipeline) {
|
||||
|
@ -20,6 +20,7 @@ import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.FileRegion;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
@ -140,23 +141,29 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception {
|
||||
int writeIndex = startIndex;
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
final SelectionKey key = selectionKey();
|
||||
final int interestOps = key.interestOps();
|
||||
for (;;) {
|
||||
if (writeIndex >= msgsLength) {
|
||||
Object msg = in.current();
|
||||
if (msg == null) {
|
||||
// Wrote all messages.
|
||||
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
|
||||
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
|
||||
}
|
||||
break;
|
||||
}
|
||||
Object msg = msgs[writeIndex];
|
||||
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
if (!buf.isReadable()) {
|
||||
buf.release();
|
||||
writeIndex++;
|
||||
in.remove();
|
||||
continue;
|
||||
}
|
||||
|
||||
boolean done = false;
|
||||
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
|
||||
int localFlushedAmount = doWriteBytes(buf, i == 0);
|
||||
int localFlushedAmount = doWriteBytes(buf);
|
||||
if (localFlushedAmount == 0) {
|
||||
break;
|
||||
}
|
||||
@ -168,16 +175,19 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
}
|
||||
|
||||
if (done) {
|
||||
buf.release();
|
||||
writeIndex++;
|
||||
in.remove();
|
||||
} else {
|
||||
// Did not write completely.
|
||||
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
|
||||
key.interestOps(interestOps | SelectionKey.OP_WRITE);
|
||||
}
|
||||
break;
|
||||
}
|
||||
} else if (msg instanceof FileRegion) {
|
||||
FileRegion region = (FileRegion) msg;
|
||||
boolean done = false;
|
||||
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
|
||||
long localFlushedAmount = doWriteFileRegion(region, i == 0);
|
||||
long localFlushedAmount = doWriteFileRegion(region);
|
||||
if (localFlushedAmount == 0) {
|
||||
break;
|
||||
}
|
||||
@ -188,27 +198,27 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
}
|
||||
|
||||
if (done) {
|
||||
region.release();
|
||||
writeIndex++;
|
||||
in.remove();
|
||||
} else {
|
||||
// Did not write completely.
|
||||
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
|
||||
key.interestOps(interestOps | SelectionKey.OP_WRITE);
|
||||
}
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
|
||||
}
|
||||
}
|
||||
return writeIndex - startIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a {@link FileRegion}
|
||||
*
|
||||
* @param region the {@link FileRegion} from which the bytes should be written
|
||||
* @param lastSpin {@code true} if this is the last write try
|
||||
* @return amount the amount of written bytes
|
||||
* @throws Exception thrown if an error accour
|
||||
*/
|
||||
protected abstract long doWriteFileRegion(FileRegion region, boolean lastSpin) throws Exception;
|
||||
protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
|
||||
|
||||
/**
|
||||
* Read bytes into the given {@link ByteBuf} and return the amount.
|
||||
@ -218,11 +228,9 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
/**
|
||||
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
|
||||
* @param buf the {@link ByteBuf} from which the bytes should be written
|
||||
* @param lastSpin {@code true} if this is the last write try
|
||||
* @return amount the amount of written bytes
|
||||
* @throws Exception thrown if an error accour
|
||||
*/
|
||||
protected abstract int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception;
|
||||
protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
|
||||
|
||||
protected final void updateOpWrite(long expectedWrittenBytes, long writtenBytes, boolean lastSpin) {
|
||||
if (writtenBytes >= expectedWrittenBytes) {
|
||||
|
@ -17,6 +17,7 @@ package io.netty.channel.nio;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ServerChannel;
|
||||
|
||||
@ -109,15 +110,38 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception {
|
||||
final int writeSpinCount = config().getWriteSpinCount() - 1;
|
||||
for (int i = writeSpinCount; i >= 0; i --) {
|
||||
int written = doWriteMessages(msgs, msgsLength, startIndex, i == 0);
|
||||
if (written > 0) {
|
||||
return written;
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
final SelectionKey key = selectionKey();
|
||||
final int interestOps = key.interestOps();
|
||||
|
||||
for (;;) {
|
||||
Object msg = in.current();
|
||||
if (msg == null) {
|
||||
// Wrote all messages.
|
||||
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
|
||||
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
boolean done = false;
|
||||
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
|
||||
if (doWriteMessage(msg)) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (done) {
|
||||
in.remove();
|
||||
} else {
|
||||
// Did not write all messages.
|
||||
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
|
||||
key.interestOps(interestOps | SelectionKey.OP_WRITE);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -126,10 +150,9 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
|
||||
protected abstract int doReadMessages(List<Object> buf) throws Exception;
|
||||
|
||||
/**
|
||||
* Write messages to the underlying {@link java.nio.channels.Channel}.
|
||||
* @param lastSpin {@code true} if this is the last write try
|
||||
* @return the number of written messages
|
||||
* Write a message to the underlying {@link java.nio.channels.Channel}.
|
||||
*
|
||||
* @return {@code true} if and only if the message has been written
|
||||
*/
|
||||
protected abstract int doWriteMessages(
|
||||
Object[] msgs, int msgLength, int startIndex, boolean lastSpin) throws Exception;
|
||||
protected abstract boolean doWriteMessage(Object msg) throws Exception;
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.FileRegion;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
@ -152,30 +153,27 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception {
|
||||
int writeIndex = startIndex;
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
for (;;) {
|
||||
if (writeIndex >= msgsLength) {
|
||||
Object msg = in.current();
|
||||
if (in == null) {
|
||||
break;
|
||||
}
|
||||
Object msg = msgs[writeIndex];
|
||||
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
while (buf.isReadable()) {
|
||||
doWriteBytes(buf);
|
||||
}
|
||||
buf.release();
|
||||
writeIndex++;
|
||||
in.remove();
|
||||
} else if (msg instanceof FileRegion) {
|
||||
FileRegion region = (FileRegion) msg;
|
||||
doWriteFileRegion(region);
|
||||
region.release();
|
||||
writeIndex++;
|
||||
doWriteFileRegion((FileRegion) msg);
|
||||
in.remove();
|
||||
} else {
|
||||
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
|
||||
in.remove(new UnsupportedOperationException(
|
||||
"unsupported message type: " + StringUtil.simpleClassName(msg)));
|
||||
}
|
||||
}
|
||||
return writeIndex - startIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -28,7 +28,6 @@ import io.netty.channel.nio.AbstractNioMessageChannel;
|
||||
import io.netty.channel.socket.DatagramChannelConfig;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.channel.socket.InternetProtocolFamily;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
@ -224,18 +223,17 @@ public final class NioDatagramChannel
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWriteMessages(Object[] msgs, int msgsLength, int startIndex, boolean lastSpin) throws Exception {
|
||||
final Object o = msgs[startIndex];
|
||||
protected boolean doWriteMessage(Object msg) throws Exception {
|
||||
final Object m;
|
||||
final ByteBuf data;
|
||||
final SocketAddress remoteAddress;
|
||||
if (o instanceof AddressedEnvelope) {
|
||||
if (msg instanceof AddressedEnvelope) {
|
||||
@SuppressWarnings("unchecked")
|
||||
AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) o;
|
||||
AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) msg;
|
||||
remoteAddress = envelope.recipient();
|
||||
m = envelope.content();
|
||||
} else {
|
||||
m = o;
|
||||
m = msg;
|
||||
remoteAddress = null;
|
||||
}
|
||||
|
||||
@ -244,10 +242,14 @@ public final class NioDatagramChannel
|
||||
} else if (m instanceof ByteBuf) {
|
||||
data = (ByteBuf) m;
|
||||
} else {
|
||||
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(0));
|
||||
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
|
||||
}
|
||||
|
||||
int dataLen = data.readableBytes();
|
||||
if (dataLen == 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
ByteBuffer nioData;
|
||||
if (data.nioBufferCount() == 1) {
|
||||
nioData = data.nioBuffer();
|
||||
@ -264,32 +266,7 @@ public final class NioDatagramChannel
|
||||
writtenBytes = javaChannel().write(nioData);
|
||||
}
|
||||
|
||||
final SelectionKey key = selectionKey();
|
||||
final int interestOps = key.interestOps();
|
||||
if (writtenBytes <= 0 && dataLen > 0) {
|
||||
// Did not write a packet.
|
||||
// 1) If 'lastSpin' is false, the caller will call this method again real soon.
|
||||
// - Do not update OP_WRITE.
|
||||
// 2) If 'lastSpin' is true, the caller will not retry.
|
||||
// - Set OP_WRITE so that the event loop calls flushForcibly() later.
|
||||
if (lastSpin) {
|
||||
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
|
||||
key.interestOps(interestOps | SelectionKey.OP_WRITE);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Wrote a packet - free the message.
|
||||
ReferenceCountUtil.release(o);
|
||||
|
||||
if (startIndex + 1 == msgsLength) {
|
||||
// Wrote the outbound buffer completely - clear OP_WRITE.
|
||||
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
|
||||
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
return writtenBytes > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -151,7 +151,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWriteMessages(Object[] msgs, int msgsLength, int startIndex, boolean lastSpin) throws Exception {
|
||||
protected boolean doWriteMessage(Object msg) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.FileRegion;
|
||||
@ -27,7 +28,6 @@ import io.netty.channel.nio.AbstractNioByteChannel;
|
||||
import io.netty.channel.socket.DefaultSocketChannelConfig;
|
||||
import io.netty.channel.socket.ServerSocketChannel;
|
||||
import io.netty.channel.socket.SocketChannelConfig;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
@ -247,27 +247,29 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception {
|
||||
protected int doWriteBytes(ByteBuf buf) throws Exception {
|
||||
final int expectedWrittenBytes = buf.readableBytes();
|
||||
final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes);
|
||||
updateOpWrite(expectedWrittenBytes, writtenBytes, lastSpin);
|
||||
return writtenBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long doWriteFileRegion(FileRegion region, boolean lastSpin) throws Exception {
|
||||
protected long doWriteFileRegion(FileRegion region) throws Exception {
|
||||
final long position = region.transfered();
|
||||
final long expectedWrittenBytes = region.count() - position;
|
||||
final long writtenBytes = region.transferTo(javaChannel(), position);
|
||||
updateOpWrite(expectedWrittenBytes, writtenBytes, lastSpin);
|
||||
return writtenBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWrite(Object[] msgs, int msgsLength, final int startIndex) throws Exception {
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
// FIXME: Re-enable gathering write.
|
||||
super.doWrite(in);
|
||||
|
||||
/*
|
||||
// Do non-gathering write for a single buffer case.
|
||||
if (msgsLength <= 1) {
|
||||
return super.doWrite(msgs, msgsLength, startIndex);
|
||||
if (in.size() <= 1) {
|
||||
super.doWrite(in);
|
||||
return;
|
||||
}
|
||||
|
||||
ByteBuffer[] nioBuffers = getNioBufferArray();
|
||||
@ -363,5 +365,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
||||
}
|
||||
return writtenBufs;
|
||||
}
|
||||
*/
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.oio.AbstractOioMessageChannel;
|
||||
@ -29,7 +30,6 @@ import io.netty.channel.socket.DatagramChannel;
|
||||
import io.netty.channel.socket.DatagramChannelConfig;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.channel.socket.DefaultDatagramChannelConfig;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.internal.EmptyArrays;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
@ -233,43 +233,48 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception {
|
||||
final Object o = msgs[startIndex];
|
||||
final Object m;
|
||||
final ByteBuf data;
|
||||
final SocketAddress remoteAddress;
|
||||
if (o instanceof AddressedEnvelope) {
|
||||
@SuppressWarnings("unchecked")
|
||||
AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) o;
|
||||
remoteAddress = envelope.recipient();
|
||||
m = envelope.content();
|
||||
} else {
|
||||
m = o;
|
||||
remoteAddress = null;
|
||||
}
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
for (;;) {
|
||||
final Object o = in.current();
|
||||
if (o == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (m instanceof ByteBufHolder) {
|
||||
data = ((ByteBufHolder) m).content();
|
||||
} else if (m instanceof ByteBuf) {
|
||||
data = (ByteBuf) m;
|
||||
} else {
|
||||
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o));
|
||||
}
|
||||
final Object m;
|
||||
final ByteBuf data;
|
||||
final SocketAddress remoteAddress;
|
||||
if (o instanceof AddressedEnvelope) {
|
||||
@SuppressWarnings("unchecked")
|
||||
AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) o;
|
||||
remoteAddress = envelope.recipient();
|
||||
m = envelope.content();
|
||||
} else {
|
||||
m = o;
|
||||
remoteAddress = null;
|
||||
}
|
||||
|
||||
int length = data.readableBytes();
|
||||
if (remoteAddress != null) {
|
||||
tmpPacket.setSocketAddress(remoteAddress);
|
||||
if (m instanceof ByteBufHolder) {
|
||||
data = ((ByteBufHolder) m).content();
|
||||
} else if (m instanceof ByteBuf) {
|
||||
data = (ByteBuf) m;
|
||||
} else {
|
||||
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o));
|
||||
}
|
||||
|
||||
int length = data.readableBytes();
|
||||
if (remoteAddress != null) {
|
||||
tmpPacket.setSocketAddress(remoteAddress);
|
||||
}
|
||||
if (data.hasArray()) {
|
||||
tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length);
|
||||
} else {
|
||||
byte[] tmp = new byte[length];
|
||||
data.getBytes(data.readerIndex(), tmp);
|
||||
tmpPacket.setData(tmp);
|
||||
}
|
||||
socket.send(tmpPacket);
|
||||
in.remove();
|
||||
}
|
||||
if (data.hasArray()) {
|
||||
tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length);
|
||||
} else {
|
||||
byte[] tmp = new byte[length];
|
||||
data.getBytes(data.readerIndex(), tmp);
|
||||
tmpPacket.setData(tmp);
|
||||
}
|
||||
socket.send(tmpPacket);
|
||||
ReferenceCountUtil.release(o);
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -17,6 +17,7 @@ package io.netty.channel.socket.oio;
|
||||
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.oio.AbstractOioMessageChannel;
|
||||
import io.netty.channel.socket.ServerSocketChannel;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
@ -173,7 +174,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception {
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@ import io.netty.channel.local.LocalAddress;
|
||||
import io.netty.channel.local.LocalChannel;
|
||||
import io.netty.channel.local.LocalEventLoopGroup;
|
||||
import io.netty.channel.local.LocalServerChannel;
|
||||
import io.netty.util.AbstractReferenceCounted;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
import org.junit.After;
|
||||
@ -100,37 +101,10 @@ public class DefaultChannelPipelineTest {
|
||||
public void testFreeCalled() throws Exception {
|
||||
final CountDownLatch free = new CountDownLatch(1);
|
||||
|
||||
final ReferenceCounted holder = new ReferenceCounted() {
|
||||
final ReferenceCounted holder = new AbstractReferenceCounted() {
|
||||
@Override
|
||||
public int refCnt() {
|
||||
return (int) free.getCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceCounted retain() {
|
||||
fail();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceCounted retain(int increment) {
|
||||
fail();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release() {
|
||||
assertEquals(1, refCnt());
|
||||
protected void deallocate() {
|
||||
free.countDown();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release(int decrement) {
|
||||
for (int i = 0; i < decrement; i ++) {
|
||||
release();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user