diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index d29737fc5c..36e513b898 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -292,10 +292,14 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett } @Override - protected int doWriteMessages(Object[] msgs, int msgLength, int startIndex, boolean lastSpin) throws Exception { - SctpMessage packet = (SctpMessage) msgs[startIndex]; + protected boolean doWriteMessage(Object msg) throws Exception { + SctpMessage packet = (SctpMessage) msg; ByteBuf data = packet.content(); int dataLen = data.readableBytes(); + if (dataLen == 0) { + return true; + } + ByteBuffer nioData; if (data.nioBufferCount() == 1) { nioData = data.nioBuffer(); @@ -311,32 +315,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett final int writtenBytes = javaChannel().send(nioData, mi); - final SelectionKey key = selectionKey(); - final int interestOps = key.interestOps(); - if (writtenBytes <= 0 && dataLen > 0) { - // Did not write a packet. - // 1) If 'lastSpin' is false, the caller will call this method again real soon. - // - Do not update OP_WRITE. - // 2) If 'lastSpin' is true, the caller will not retry. - // - Set OP_WRITE so that the event loop calls flushForcibly() later. - if (lastSpin) { - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - key.interestOps(interestOps | SelectionKey.OP_WRITE); - } - } - return 0; - } - - // packet was written free up buffer - packet.release(); - - if (msgLength == 1) { - // Wrote the outbound buffer completely - clear OP_WRITE. - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - key.interestOps(interestOps & ~SelectionKey.OP_WRITE); - } - } - return 1; + return writtenBytes > 0; } @Override diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java index a14ce71553..188b08d761 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java @@ -216,7 +216,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel } @Override - protected int doWriteMessages(Object[] msgs, int msgLength, int startIndex, boolean lastSpin) throws Exception { + protected boolean doWriteMessage(Object msg) throws Exception { throw new UnsupportedOperationException(); } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java index f3005357b5..58c0f666aa 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java @@ -24,6 +24,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.oio.AbstractOioMessageChannel; @@ -219,59 +220,57 @@ public class OioSctpChannel extends AbstractOioMessageChannel } @Override - protected int doWrite(Object[] msgs, int length, int startIndex) throws Exception { + protected void doWrite(ChannelOutboundBuffer in) throws Exception { if (!writeSelector.isOpen()) { - return 0; + return; } + final int size = in.size(); final int selectedKeys = writeSelector.select(SO_TIMEOUT); if (selectedKeys > 0) { final Set writableKeys = writeSelector.selectedKeys(); if (writableKeys.isEmpty()) { - return 0; + return; } Iterator writableKeysIt = writableKeys.iterator(); int written = 0; for (;;) { - if (written == length) { + if (written == size) { // all written - return written; + return; } writableKeysIt.next(); writableKeysIt.remove(); - SctpMessage packet = (SctpMessage) msgs[startIndex ++]; + SctpMessage packet = (SctpMessage) in.current(); if (packet == null) { - return written; + return; } - try { - ByteBuf data = packet.content(); - int dataLen = data.readableBytes(); - ByteBuffer nioData; - if (data.nioBufferCount() != -1) { - nioData = data.nioBuffer(); - } else { - nioData = ByteBuffer.allocate(dataLen); - data.getBytes(data.readerIndex(), nioData); - nioData.flip(); - } + ByteBuf data = packet.content(); + int dataLen = data.readableBytes(); + ByteBuffer nioData; - final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); - mi.payloadProtocolID(packet.protocolIdentifier()); - mi.streamNumber(packet.streamIdentifier()); - - ch.send(nioData, mi); - written ++; - } finally { - packet.release(); + if (data.nioBufferCount() != -1) { + nioData = data.nioBuffer(); + } else { + nioData = ByteBuffer.allocate(dataLen); + data.getBytes(data.readerIndex(), nioData); + nioData.flip(); } + + final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); + mi.payloadProtocolID(packet.protocolIdentifier()); + mi.streamNumber(packet.streamIdentifier()); + + ch.send(nioData, mi); + written ++; + in.remove(); + if (!writableKeysIt.hasNext()) { - return written; + return; } } } - - return 0; } @Override diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java index a2631bdd33..f787e7304d 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java @@ -20,6 +20,7 @@ import com.sun.nio.sctp.SctpServerChannel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.channel.oio.AbstractOioMessageChannel; import io.netty.channel.sctp.DefaultSctpServerChannelConfig; @@ -285,7 +286,7 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel } @Override - protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception { + protected void doWrite(ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java index b22a285885..aaff81e134 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java @@ -93,7 +93,7 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel im } @Override - protected int doWriteMessages(Object[] msgs, int msgLength, int startIndex, boolean lastSpin) throws Exception { + protected boolean doWriteMessage(Object msg) throws Exception { throw new UnsupportedOperationException(); } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java index cdb0f04487..fe17d789b8 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java @@ -140,15 +140,14 @@ public class NioUdtByteConnectorChannel extends AbstractNioByteChannel implement } @Override - protected int doWriteBytes(final ByteBuf byteBuf, final boolean lastSpin) throws Exception { + protected int doWriteBytes(final ByteBuf byteBuf) throws Exception { final int expectedWrittenBytes = byteBuf.readableBytes(); final int writtenBytes = byteBuf.readBytes(javaChannel(), expectedWrittenBytes); - updateOpWrite(expectedWrittenBytes, writtenBytes, lastSpin); return writtenBytes; } @Override - protected long doWriteFileRegion(FileRegion region, boolean lastSpin) throws Exception { + protected long doWriteFileRegion(FileRegion region) throws Exception { throw new UnsupportedOperationException(); } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java index b81c121756..eb135cda75 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java @@ -31,7 +31,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.nio.channels.SelectionKey; import java.util.List; import static java.nio.channels.SelectionKey.*; @@ -167,9 +166,9 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp } @Override - protected int doWriteMessages(Object[] msgs, int msgLength, int startIndex, boolean lastSpin) throws Exception { + protected boolean doWriteMessage(Object msg) throws Exception { // expects a message - final UdtMessage message = (UdtMessage) msgs[startIndex]; + final UdtMessage message = (UdtMessage) msg; final ByteBuf byteBuf = message.content(); @@ -182,17 +181,9 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp writtenBytes = javaChannel().write(byteBuf.nioBuffers()); } - final SelectionKey key = selectionKey(); - final int interestOps = key.interestOps(); - // did not write the message if (writtenBytes <= 0 && messageSize > 0) { - if (lastSpin) { - if ((interestOps & OP_WRITE) == 0) { - key.interestOps(interestOps | OP_WRITE); - } - } - return 0; + return false; } // wrote message completely @@ -201,16 +192,7 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp "Provider error: failed to write message. Provider library should be upgraded."); } - // wrote the message queue completely - clear OP_WRITE. - if (msgLength == 1) { - if ((interestOps & OP_WRITE) != 0) { - key.interestOps(interestOps & ~OP_WRITE); - } - } - - message.release(); - - return 1; + return true; } @Override diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 7f4c3b1c4b..c1d854b87a 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -516,12 +516,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha promise.setFailure(t); } - if (!outboundBuffer.isEmpty()) { - // fail all queued messages - outboundBuffer.fail(CLOSED_CHANNEL_EXCEPTION); - } - - outboundBuffer.clearUnflushed(CLOSED_CHANNEL_EXCEPTION); + // fail all queued messages + outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); + outboundBuffer.failUnflushed(CLOSED_CHANNEL_EXCEPTION); if (wasActive && !isActive()) { invokeLater(new Runnable() { @@ -628,59 +625,28 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return; } - inFlush0 = true; - final ChannelOutboundBuffer outboundBuffer = AbstractChannel.this.outboundBuffer; + if (outboundBuffer.isEmpty()) { + return; + } + + inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { if (isOpen()) { - outboundBuffer.fail(NOT_YET_CONNECTED_EXCEPTION); + outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION); } else { - outboundBuffer.fail(CLOSED_CHANNEL_EXCEPTION); + outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); } inFlush0 = false; return; } try { - for (;;) { - MessageList messages = outboundBuffer.currentMessageList; - if (messages == null) { - if (!outboundBuffer.next()) { - break; - } - messages = outboundBuffer.currentMessageList; - } - - int messageIndex = outboundBuffer.currentMessageIndex; - int messageCount = messages.size(); - Object[] messageArray = messages.messages(); - ChannelPromise[] promiseArray = messages.promises(); - - // Write the messages. - final int writtenMessages = doWrite(messageArray, messageCount, messageIndex); - - // Notify the promises. - final int newMessageIndex = messageIndex + writtenMessages; - for (int i = messageIndex; i < newMessageIndex; i ++) { - promiseArray[i].trySuccess(); - } - - // Update the index variable and decide what to do next. - outboundBuffer.currentMessageIndex = messageIndex = newMessageIndex; - if (messageIndex >= messageCount) { - messages.recycle(); - if (!outboundBuffer.next()) { - break; - } - } else { - // Could not flush the current write request completely. Try again later. - break; - } - } + doWrite(outboundBuffer); } catch (Throwable t) { - outboundBuffer.fail(t); + outboundBuffer.failFlushed(t); if (t instanceof IOException) { close(voidPromise()); } @@ -790,13 +756,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract void doBeginRead() throws Exception; /** - * Flush the content of the given {@link ByteBuf} to the remote peer. + * Flush the content of the given buffer to the remote peer. * * Sub-classes may override this as this implementation will just thrown an {@link UnsupportedOperationException} - * - * @return the number of written messages */ - protected abstract int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception; + protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception; protected static void checkEOF(FileRegion region) throws IOException { if (region.transfered() < region.count()) { diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 14593da040..5835e2b77a 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -67,7 +67,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S } @Override - protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception { + protected void doWrite(ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index c52794efd7..5beb91b518 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -23,35 +23,38 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -final class ChannelOutboundBuffer { +public final class ChannelOutboundBuffer { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class); private static final int MIN_INITIAL_CAPACITY = 8; - MessageList currentMessageList; - int currentMessageIndex; - private long currentMessageListSize; - - private MessageList[] messageLists; - private long[] messageListSizes; - - private MessageList unflushedMessageList; - private long unflushedMessageListSize; - - private int head; - private int tail; - private boolean inFail; private final AbstractChannel channel; + // Flushed messages are stored in a circulas queue. + private Object[] flushed; + private ChannelPromise[] flushedPromises; + private long[] flushedProgresses; + private long[] flushedTotals; + private int head; + private int tail; + + // Unflushed messages are stored in an array list. + private Object[] unflushed; + private ChannelPromise[] unflushedPromises; + private long[] unflushedTotals; + private int unflushedCount; + + private boolean inFail; private long pendingOutboundBytes; private static final AtomicIntegerFieldUpdater WRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "writable"); - @SuppressWarnings("unused") + @SuppressWarnings({ "unused", "FieldMayBeFinal" }) private volatile int writable = 1; ChannelOutboundBuffer(AbstractChannel channel) { @@ -82,39 +85,126 @@ final class ChannelOutboundBuffer { this.channel = channel; - messageLists = new MessageList[initialCapacity]; - messageListSizes = new long[initialCapacity]; + flushed = new Object[initialCapacity]; + flushedPromises = new ChannelPromise[initialCapacity]; + flushedProgresses = new long[initialCapacity]; + flushedTotals = new long[initialCapacity]; + + unflushed = new Object[initialCapacity]; + unflushedPromises = new ChannelPromise[initialCapacity]; + unflushedTotals = new long[initialCapacity]; } void addMessage(Object msg, ChannelPromise promise) { - MessageList unflushedMessageList = this.unflushedMessageList; - if (unflushedMessageList == null) { - this.unflushedMessageList = unflushedMessageList = MessageList.newInstance(); + Object[] unflushed = this.unflushed; + int unflushedCount = this.unflushedCount; + if (unflushedCount == unflushed.length - 1) { + doubleUnflushedCapacity(); + unflushed = this.unflushed; } - unflushedMessageList.add(msg, promise); - - int size = channel.calculateMessageSize(msg); - unflushedMessageListSize += size; + final int size = channel.calculateMessageSize(msg); incrementPendingOutboundBytes(size); + + unflushed[unflushedCount] = msg; + unflushedPromises[unflushedCount] = promise; + unflushedTotals[unflushedCount] = size; + this.unflushedCount = unflushedCount + 1; + } + + private void doubleUnflushedCapacity() { + int newCapacity = unflushed.length << 1; + if (newCapacity < 0) { + throw new IllegalStateException(); + } + + int unflushedCount = this.unflushedCount; + + Object[] a1 = new Object[newCapacity]; + System.arraycopy(unflushed, 0, a1, 0, unflushedCount); + unflushed = a1; + + ChannelPromise[] a2 = new ChannelPromise[newCapacity]; + System.arraycopy(unflushedPromises, 0, a2, 0, unflushedCount); + unflushedPromises = a2; + + long[] a3 = new long[newCapacity]; + System.arraycopy(unflushedTotals, 0, a3, 0, unflushedCount); + unflushedTotals = a3; } void addFlush() { - MessageList unflushedMessageList = this.unflushedMessageList; - if (unflushedMessageList == null) { + final int unflushedCount = this.unflushedCount; + if (unflushedCount == 0) { return; } + Object[] unflushed = this.unflushed; + ChannelPromise[] unflushedPromises = this.unflushedPromises; + long[] unflushedTotals = this.unflushedTotals; + + Object[] flushed = this.flushed; + ChannelPromise[] flushedPromises = this.flushedPromises; + long[] flushedProgresses = this.flushedProgresses; + long[] flushedTotals = this.flushedTotals; + int head = this.head; int tail = this.tail; - messageLists[tail] = unflushedMessageList; - messageListSizes[tail] = unflushedMessageListSize; - this.unflushedMessageList = null; - unflushedMessageListSize = 0; - - if ((this.tail = (tail + 1) & (messageLists.length - 1)) == head) { - doubleCapacity(); + for (int i = 0; i < unflushedCount; i ++) { + flushed[tail] = unflushed[i]; + flushedPromises[tail] = unflushedPromises[i]; + flushedProgresses[tail] = 0; + flushedTotals[tail] = unflushedTotals[i]; + if ((tail = (tail + 1) & (flushed.length - 1)) == head) { + this.tail = tail; + doubleFlushedCapacity(); + head = this.head; + tail = this.tail; + flushed = this.flushed; + flushedPromises = this.flushedPromises; + flushedProgresses = this.flushedProgresses; + flushedTotals = this.flushedTotals; + } } + + Arrays.fill(unflushed, 0, unflushedCount, null); + Arrays.fill(unflushedPromises, 0, unflushedCount, null); + this.unflushedCount = 0; + + this.tail = tail; + } + + private void doubleFlushedCapacity() { + int p = head; + int n = flushed.length; + int r = n - p; // number of elements to the right of p + int newCapacity = n << 1; + if (newCapacity < 0) { + throw new IllegalStateException(); + } + + Object[] a1 = new Object[newCapacity]; + System.arraycopy(flushed, p, a1, 0, r); + System.arraycopy(flushed, 0, a1, r, p); + flushed = a1; + + ChannelPromise[] a2 = new ChannelPromise[newCapacity]; + System.arraycopy(flushedPromises, p, a2, 0, r); + System.arraycopy(flushedPromises, 0, a2, r, p); + flushedPromises = a2; + + long[] a3 = new long[newCapacity]; + System.arraycopy(flushedProgresses, p, a3, 0, r); + System.arraycopy(flushedProgresses, 0, a3, r, p); + flushedProgresses = a3; + + long[] a4 = new long[newCapacity]; + System.arraycopy(flushedTotals, p, a4, 0, r); + System.arraycopy(flushedTotals, 0, a4, r, p); + flushedTotals = a4; + + head = 0; + tail = n; } private void incrementPendingOutboundBytes(int size) { @@ -148,54 +238,58 @@ final class ChannelOutboundBuffer { } } - private void doubleCapacity() { - assert head == tail; - - int p = head; - int n = messageLists.length; - int r = n - p; // number of elements to the right of p - int newCapacity = n << 1; - if (newCapacity < 0) { - throw new IllegalStateException("Sorry, deque too big"); - } - - @SuppressWarnings("unchecked") - MessageList[] a1 = new MessageList[newCapacity]; - System.arraycopy(messageLists, p, a1, 0, r); - System.arraycopy(messageLists, 0, a1, r, p); - messageLists = a1; - - long[] a2 = new long[newCapacity]; - System.arraycopy(messageListSizes, p, a2, 0, r); - System.arraycopy(messageListSizes, 0, a2, r, p); - messageListSizes = a2; - - head = 0; - tail = n; + public Object current() { + return flushed[head]; } - boolean next() { - // FIXME: pendingOutboundBytes should be decreased when the messages are flushed. + public void progress(long amount) { + int head = this.head; + ChannelPromise p = flushedPromises[head]; + if (p instanceof ChannelProgressivePromise) { + long progress = flushedProgresses[head] + amount; + flushedProgresses[head] = progress; + ((ChannelProgressivePromise) p).tryProgress(progress, flushedTotals[head]); + } + } - decrementPendingOutboundBytes(currentMessageListSize); + public boolean remove() { + int head = this.head; - int h = head; - - MessageList e = messageLists[h]; // Element is null if deque empty - if (e == null) { - currentMessageListSize = 0; - currentMessageList = null; + Object msg = flushed[head]; + if (msg == null) { return false; } - currentMessageList = messageLists[h]; - currentMessageIndex = 0; - currentMessageListSize = messageListSizes[h]; + safeRelease(msg); + flushed[head] = null; - messageLists[h] = null; - messageListSizes[h] = 0; + ChannelPromise promise = flushedPromises[head]; + promise.trySuccess(); + flushedPromises[head] = null; - head = h + 1 & messageLists.length - 1; + decrementPendingOutboundBytes(flushedTotals[head]); + + this.head = head + 1 & flushed.length - 1; + return true; + } + + public boolean remove(Throwable cause) { + int head = this.head; + + Object msg = flushed[head]; + if (msg == null) { + return false; + } + + safeRelease(msg); + flushed[head] = null; + + safeFail(flushedPromises[head], cause); + flushedPromises[head] = null; + + decrementPendingOutboundBytes(flushedTotals[head]); + + this.head = head + 1 & flushed.length - 1; return true; } @@ -203,43 +297,38 @@ final class ChannelOutboundBuffer { return WRITABLE_UPDATER.get(this) == 1; } - int size() { - return tail - head & messageLists.length - 1; + public int size() { + return tail - head & flushed.length - 1; } - boolean isEmpty() { + public boolean isEmpty() { return head == tail; } - void clearUnflushed(Throwable cause) { + void failUnflushed(Throwable cause) { if (inFail) { return; } - MessageList unflushed = unflushedMessageList; - if (unflushed == null) { - return; - } - inFail = true; // Release all unflushed messages. - Object[] messages = unflushed.messages(); - ChannelPromise[] promises = unflushed.promises(); - final int size = unflushed.size(); + Object[] unflushed = this.unflushed; + ChannelPromise[] unflushedPromises = this.unflushedPromises; + long[] unflushedTotals = this.unflushedTotals; + final int unflushedCount = this.unflushedCount; try { - for (int i = 0; i < size; i++) { - safeRelease(messages[i], promises[i], cause); + for (int i = 0; i < unflushedCount; i++) { + safeRelease(unflushed[i]); + safeFail(unflushedPromises[i], cause); + decrementPendingOutboundBytes(unflushedTotals[i]); } } finally { - unflushed.recycle(); - decrementPendingOutboundBytes(unflushedMessageListSize); - unflushedMessageListSize = 0; inFail = false; } } - void fail(Throwable cause) { + void failFlushed(Throwable cause) { // Make sure that this method does not reenter. A listener added to the current promise can be notified by the // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call // indirectly (usually by closing the channel.) @@ -251,49 +340,27 @@ final class ChannelOutboundBuffer { try { inFail = true; - if (currentMessageList == null) { - if (!next()) { - return; + for (;;) { + if (!remove(cause)) { + break; } } - - do { - if (currentMessageList != null) { - // Store a local reference of current messages - // This is needed as a promise may have a listener attached that will close the channel - // The close will call next() which will set currentMessages to null and so - // trigger a NPE in the finally block if no local reference is used. - // - // See https://github.com/netty/netty/issues/1573 - MessageList current = currentMessageList; - // Release all failed messages. - Object[] messages = current.messages(); - ChannelPromise[] promises = current.promises(); - final int size = current.size(); - try { - for (int i = currentMessageIndex; i < size; i++) { - safeRelease(messages[i], promises[i], cause); - } - } finally { - current.recycle(); - } - } - } while(next()); } finally { inFail = false; } } - private static void safeRelease(Object message, ChannelPromise promise, Throwable cause) { + private static void safeRelease(Object message) { try { ReferenceCountUtil.release(message); } catch (Throwable t) { logger.warn("Failed to release a message.", t); } + } - ChannelPromise p = promise; - if (!(p instanceof VoidChannelPromise) && !p.tryFailure(cause)) { - logger.warn("Promise done already: {} - new exception is:", p, cause); + private static void safeFail(ChannelPromise promise, Throwable cause) { + if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { + logger.warn("Promise done already: {} - new exception is:", promise, cause); } } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index c5494ea852..f780a010b8 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -346,11 +346,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements throw new NullPointerException("msg"); } - // FIXME: Remove once refactoring is done. - if (msg instanceof MessageList) { - throw new IllegalStateException(); - } - final DefaultChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { diff --git a/transport/src/main/java/io/netty/channel/MessageList.java b/transport/src/main/java/io/netty/channel/MessageList.java deleted file mode 100644 index c4b911a25a..0000000000 --- a/transport/src/main/java/io/netty/channel/MessageList.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.channel; - -import io.netty.util.Recycler; -import io.netty.util.Recycler.Handle; - -import java.util.Arrays; - -/** - * A simple array-backed list that holds one or more messages. - */ -final class MessageList { - - private static final int DEFAULT_INITIAL_CAPACITY = 8; - private static final int MIN_INITIAL_CAPACITY = 4; - - private static final Recycler RECYCLER = new Recycler() { - @Override - protected MessageList newObject(Handle handle) { - return new MessageList(handle); - } - }; - - /** - * Create a new empty {@link MessageList} instance. - */ - static MessageList newInstance() { - MessageList ret = RECYCLER.get(); - return ret; - } - - private final Handle handle; - private Object[] messages; - private ChannelPromise[] promises; - private int size; - - MessageList(Handle handle) { - this(handle, DEFAULT_INITIAL_CAPACITY); - } - - MessageList(Handle handle, int initialCapacity) { - this.handle = handle; - initialCapacity = normalizeCapacity(initialCapacity); - messages = new Object[initialCapacity]; - promises = new ChannelPromise[initialCapacity]; - } - - /** - * Return the current size of this {@link MessageList} and so how many messages it holds. - */ - int size() { - return size; - } - - /** - * Return {@code true} if this {@link MessageList} is empty and so contains no messages. - */ - boolean isEmpty() { - return size == 0; - } - - /** - * Add the message to this {@link MessageList} and return itself. - */ - MessageList add(Object message, ChannelPromise promise) { - int oldSize = size; - int newSize = oldSize + 1; - ensureCapacity(newSize); - messages[oldSize] = message; - promises[oldSize] = promise; - size = newSize; - return this; - } - - /** - * Returns the backing array of this list. - */ - Object[] messages() { - return messages; - } - - ChannelPromise[] promises() { - return promises; - } - - /** - * Clear and recycle this instance. - */ - boolean recycle() { - Arrays.fill(messages, 0, size, null); - Arrays.fill(promises, 0, size, null); - size = 0; - return RECYCLER.recycle(this, handle); - } - - private void ensureCapacity(int capacity) { - if (messages.length >= capacity) { - return; - } - - final int size = this.size; - capacity = normalizeCapacity(capacity); - - Object[] newMessages = new Object[capacity]; - System.arraycopy(messages, 0, newMessages, 0, size); - messages = newMessages; - - ChannelPromise[] newPromises = new ChannelPromise[capacity]; - System.arraycopy(promises, 0, newPromises, 0, size); - promises = newPromises; - } - - private static int normalizeCapacity(int initialCapacity) { - if (initialCapacity <= MIN_INITIAL_CAPACITY) { - initialCapacity = MIN_INITIAL_CAPACITY; - } else { - initialCapacity |= initialCapacity >>> 1; - initialCapacity |= initialCapacity >>> 2; - initialCapacity |= initialCapacity >>> 4; - initialCapacity |= initialCapacity >>> 8; - initialCapacity |= initialCapacity >>> 16; - initialCapacity ++; - - if (initialCapacity < 0) { - initialCapacity >>>= 1; - } - } - return initialCapacity; - } -} diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 809c339695..16a684a393 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -23,10 +23,12 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; +import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.RecyclableArrayList; import io.netty.util.internal.logging.InternalLogger; @@ -305,11 +307,17 @@ public class EmbeddedChannel extends AbstractChannel { } @Override - protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception { - for (int i = startIndex; i < msgsLength; i ++) { - lastOutboundBuffer.add(msgs[i]); + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + for (;;) { + Object msg = in.current(); + if (msg == null) { + break; + } + + ReferenceCountUtil.retain(msg); + lastOutboundBuffer.add(msg); + in.remove(); } - return msgsLength - startIndex; } private class DefaultUnsafe extends AbstractUnsafe { diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 63e045e44c..cc4acb2050 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -20,11 +20,13 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; import io.netty.channel.SingleThreadEventLoop; +import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.SingleThreadEventExecutor; import java.net.SocketAddress; @@ -265,7 +267,7 @@ public class LocalChannel extends AbstractChannel { } @Override - protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception { + protected void doWrite(ChannelOutboundBuffer in) throws Exception { if (state < 2) { throw new NotYetConnectedException(); } @@ -278,14 +280,23 @@ public class LocalChannel extends AbstractChannel { final EventLoop peerLoop = peer.eventLoop(); if (peerLoop == eventLoop()) { - for (int i = startIndex; i < msgsLength; i ++) { - peer.inboundBuffer.add(msgs[i]); + for (;;) { + Object msg = in.current(); + if (msg == null) { + break; + } + peer.inboundBuffer.add(msg); + ReferenceCountUtil.retain(msg); + in.remove(); } finishPeerRead(peer, peerPipeline); } else { // Use a copy because the original msgs will be recycled by AbstractChannel. - final Object[] msgsCopy = new Object[msgsLength - startIndex]; - System.arraycopy(msgs, startIndex, msgsCopy, 0, msgsCopy.length); + final Object[] msgsCopy = new Object[in.size()]; + for (int i = 0; i < msgsCopy.length; i ++) { + msgsCopy[i] = ReferenceCountUtil.retain(in.current()); + in.remove(); + } peerLoop.execute(new Runnable() { @Override @@ -297,8 +308,6 @@ public class LocalChannel extends AbstractChannel { } }); } - - return msgsLength - startIndex; } private static void finishPeerRead(LocalChannel peer, ChannelPipeline peerPipeline) { diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 88711953cb..6b73caac04 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.FileRegion; import io.netty.channel.RecvByteBufAllocator; @@ -140,23 +141,29 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } @Override - protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception { - int writeIndex = startIndex; + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + final SelectionKey key = selectionKey(); + final int interestOps = key.interestOps(); for (;;) { - if (writeIndex >= msgsLength) { + Object msg = in.current(); + if (msg == null) { + // Wrote all messages. + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + key.interestOps(interestOps & ~SelectionKey.OP_WRITE); + } break; } - Object msg = msgs[writeIndex]; + if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (!buf.isReadable()) { - buf.release(); - writeIndex++; + in.remove(); continue; } + boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { - int localFlushedAmount = doWriteBytes(buf, i == 0); + int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { break; } @@ -168,16 +175,19 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } if (done) { - buf.release(); - writeIndex++; + in.remove(); } else { + // Did not write completely. + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + key.interestOps(interestOps | SelectionKey.OP_WRITE); + } break; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { - long localFlushedAmount = doWriteFileRegion(region, i == 0); + long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount == 0) { break; } @@ -188,27 +198,27 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } if (done) { - region.release(); - writeIndex++; + in.remove(); } else { + // Did not write completely. + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + key.interestOps(interestOps | SelectionKey.OP_WRITE); + } break; } } else { throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); } } - return writeIndex - startIndex; } /** * Write a {@link FileRegion} * * @param region the {@link FileRegion} from which the bytes should be written - * @param lastSpin {@code true} if this is the last write try * @return amount the amount of written bytes - * @throws Exception thrown if an error accour */ - protected abstract long doWriteFileRegion(FileRegion region, boolean lastSpin) throws Exception; + protected abstract long doWriteFileRegion(FileRegion region) throws Exception; /** * Read bytes into the given {@link ByteBuf} and return the amount. @@ -218,11 +228,9 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { /** * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. * @param buf the {@link ByteBuf} from which the bytes should be written - * @param lastSpin {@code true} if this is the last write try * @return amount the amount of written bytes - * @throws Exception thrown if an error accour */ - protected abstract int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception; + protected abstract int doWriteBytes(ByteBuf buf) throws Exception; protected final void updateOpWrite(long expectedWrittenBytes, long writtenBytes, boolean lastSpin) { if (writtenBytes >= expectedWrittenBytes) { diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index 9131e8bbae..6c0e1c8333 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -17,6 +17,7 @@ package io.netty.channel.nio; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ServerChannel; @@ -109,15 +110,38 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { } @Override - protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception { - final int writeSpinCount = config().getWriteSpinCount() - 1; - for (int i = writeSpinCount; i >= 0; i --) { - int written = doWriteMessages(msgs, msgsLength, startIndex, i == 0); - if (written > 0) { - return written; + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + final SelectionKey key = selectionKey(); + final int interestOps = key.interestOps(); + + for (;;) { + Object msg = in.current(); + if (msg == null) { + // Wrote all messages. + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + key.interestOps(interestOps & ~SelectionKey.OP_WRITE); + } + break; + } + + boolean done = false; + for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { + if (doWriteMessage(msg)) { + done = true; + break; + } + } + + if (done) { + in.remove(); + } else { + // Did not write all messages. + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + key.interestOps(interestOps | SelectionKey.OP_WRITE); + } + break; } } - return 0; } /** @@ -126,10 +150,9 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { protected abstract int doReadMessages(List buf) throws Exception; /** - * Write messages to the underlying {@link java.nio.channels.Channel}. - * @param lastSpin {@code true} if this is the last write try - * @return the number of written messages + * Write a message to the underlying {@link java.nio.channels.Channel}. + * + * @return {@code true} if and only if the message has been written */ - protected abstract int doWriteMessages( - Object[] msgs, int msgLength, int startIndex, boolean lastSpin) throws Exception; + protected abstract boolean doWriteMessage(Object msg) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java index 185b1ebcb5..cc682c1ff7 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.FileRegion; import io.netty.channel.socket.ChannelInputShutdownEvent; @@ -152,30 +153,27 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { } @Override - protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception { - int writeIndex = startIndex; + protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { - if (writeIndex >= msgsLength) { + Object msg = in.current(); + if (in == null) { break; } - Object msg = msgs[writeIndex]; + if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; while (buf.isReadable()) { doWriteBytes(buf); } - buf.release(); - writeIndex++; + in.remove(); } else if (msg instanceof FileRegion) { - FileRegion region = (FileRegion) msg; - doWriteFileRegion(region); - region.release(); - writeIndex++; + doWriteFileRegion((FileRegion) msg); + in.remove(); } else { - throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); + in.remove(new UnsupportedOperationException( + "unsupported message type: " + StringUtil.simpleClassName(msg))); } } - return writeIndex - startIndex; } /** diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 7d1f52fb95..24a2b3748e 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -28,7 +28,6 @@ import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.InternetProtocolFamily; -import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; @@ -224,18 +223,17 @@ public final class NioDatagramChannel } @Override - protected int doWriteMessages(Object[] msgs, int msgsLength, int startIndex, boolean lastSpin) throws Exception { - final Object o = msgs[startIndex]; + protected boolean doWriteMessage(Object msg) throws Exception { final Object m; final ByteBuf data; final SocketAddress remoteAddress; - if (o instanceof AddressedEnvelope) { + if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") - AddressedEnvelope envelope = (AddressedEnvelope) o; + AddressedEnvelope envelope = (AddressedEnvelope) msg; remoteAddress = envelope.recipient(); m = envelope.content(); } else { - m = o; + m = msg; remoteAddress = null; } @@ -244,10 +242,14 @@ public final class NioDatagramChannel } else if (m instanceof ByteBuf) { data = (ByteBuf) m; } else { - throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(0)); + throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); } int dataLen = data.readableBytes(); + if (dataLen == 0) { + return true; + } + ByteBuffer nioData; if (data.nioBufferCount() == 1) { nioData = data.nioBuffer(); @@ -264,32 +266,7 @@ public final class NioDatagramChannel writtenBytes = javaChannel().write(nioData); } - final SelectionKey key = selectionKey(); - final int interestOps = key.interestOps(); - if (writtenBytes <= 0 && dataLen > 0) { - // Did not write a packet. - // 1) If 'lastSpin' is false, the caller will call this method again real soon. - // - Do not update OP_WRITE. - // 2) If 'lastSpin' is true, the caller will not retry. - // - Set OP_WRITE so that the event loop calls flushForcibly() later. - if (lastSpin) { - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - key.interestOps(interestOps | SelectionKey.OP_WRITE); - } - } - return 0; - } - - // Wrote a packet - free the message. - ReferenceCountUtil.release(o); - - if (startIndex + 1 == msgsLength) { - // Wrote the outbound buffer completely - clear OP_WRITE. - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - key.interestOps(interestOps & ~SelectionKey.OP_WRITE); - } - } - return 1; + return writtenBytes > 0; } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index fe3f20dc02..3faf971281 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -151,7 +151,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel } @Override - protected int doWriteMessages(Object[] msgs, int msgsLength, int startIndex, boolean lastSpin) throws Exception { + protected boolean doWriteMessage(Object msg) throws Exception { throw new UnsupportedOperationException(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index b2c201d64c..95036c063e 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -20,6 +20,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.FileRegion; @@ -27,7 +28,6 @@ import io.netty.channel.nio.AbstractNioByteChannel; import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannelConfig; -import io.netty.util.ReferenceCounted; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -247,27 +247,29 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty } @Override - protected int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception { + protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes); - updateOpWrite(expectedWrittenBytes, writtenBytes, lastSpin); return writtenBytes; } @Override - protected long doWriteFileRegion(FileRegion region, boolean lastSpin) throws Exception { + protected long doWriteFileRegion(FileRegion region) throws Exception { final long position = region.transfered(); - final long expectedWrittenBytes = region.count() - position; final long writtenBytes = region.transferTo(javaChannel(), position); - updateOpWrite(expectedWrittenBytes, writtenBytes, lastSpin); return writtenBytes; } @Override - protected int doWrite(Object[] msgs, int msgsLength, final int startIndex) throws Exception { + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + // FIXME: Re-enable gathering write. + super.doWrite(in); + + /* // Do non-gathering write for a single buffer case. - if (msgsLength <= 1) { - return super.doWrite(msgs, msgsLength, startIndex); + if (in.size() <= 1) { + super.doWrite(in); + return; } ByteBuffer[] nioBuffers = getNioBufferArray(); @@ -363,5 +365,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty } return writtenBufs; } + */ + } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index e37a23f7c1..4d5c14eccd 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -22,6 +22,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.oio.AbstractOioMessageChannel; @@ -29,7 +30,6 @@ import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.DefaultDatagramChannelConfig; -import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; @@ -233,43 +233,48 @@ public class OioDatagramChannel extends AbstractOioMessageChannel } @Override - protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception { - final Object o = msgs[startIndex]; - final Object m; - final ByteBuf data; - final SocketAddress remoteAddress; - if (o instanceof AddressedEnvelope) { - @SuppressWarnings("unchecked") - AddressedEnvelope envelope = (AddressedEnvelope) o; - remoteAddress = envelope.recipient(); - m = envelope.content(); - } else { - m = o; - remoteAddress = null; - } + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + for (;;) { + final Object o = in.current(); + if (o == null) { + break; + } - if (m instanceof ByteBufHolder) { - data = ((ByteBufHolder) m).content(); - } else if (m instanceof ByteBuf) { - data = (ByteBuf) m; - } else { - throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o)); - } + final Object m; + final ByteBuf data; + final SocketAddress remoteAddress; + if (o instanceof AddressedEnvelope) { + @SuppressWarnings("unchecked") + AddressedEnvelope envelope = (AddressedEnvelope) o; + remoteAddress = envelope.recipient(); + m = envelope.content(); + } else { + m = o; + remoteAddress = null; + } - int length = data.readableBytes(); - if (remoteAddress != null) { - tmpPacket.setSocketAddress(remoteAddress); + if (m instanceof ByteBufHolder) { + data = ((ByteBufHolder) m).content(); + } else if (m instanceof ByteBuf) { + data = (ByteBuf) m; + } else { + throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o)); + } + + int length = data.readableBytes(); + if (remoteAddress != null) { + tmpPacket.setSocketAddress(remoteAddress); + } + if (data.hasArray()) { + tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length); + } else { + byte[] tmp = new byte[length]; + data.getBytes(data.readerIndex(), tmp); + tmpPacket.setData(tmp); + } + socket.send(tmpPacket); + in.remove(); } - if (data.hasArray()) { - tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length); - } else { - byte[] tmp = new byte[length]; - data.getBytes(data.readerIndex(), tmp); - tmpPacket.setData(tmp); - } - socket.send(tmpPacket); - ReferenceCountUtil.release(o); - return 1; } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index 0e32dc11b6..04428df54d 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.oio; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.oio.AbstractOioMessageChannel; import io.netty.channel.socket.ServerSocketChannel; import io.netty.util.internal.logging.InternalLogger; @@ -173,7 +174,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel } @Override - protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception { + protected void doWrite(ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index f137525668..728c5e0bc0 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -23,6 +23,7 @@ import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalServerChannel; +import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import org.junit.After; @@ -100,37 +101,10 @@ public class DefaultChannelPipelineTest { public void testFreeCalled() throws Exception { final CountDownLatch free = new CountDownLatch(1); - final ReferenceCounted holder = new ReferenceCounted() { + final ReferenceCounted holder = new AbstractReferenceCounted() { @Override - public int refCnt() { - return (int) free.getCount(); - } - - @Override - public ReferenceCounted retain() { - fail(); - return this; - } - - @Override - public ReferenceCounted retain(int increment) { - fail(); - return this; - } - - @Override - public boolean release() { - assertEquals(1, refCnt()); + protected void deallocate() { free.countDown(); - return true; - } - - @Override - public boolean release(int decrement) { - for (int i = 0; i < decrement; i ++) { - release(); - } - return true; } };