/* * Copyright 2013 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; import io.netty.buffer.Unpooled; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.PromiseNotificationUtil; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.Arrays; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import static java.lang.Math.min; /** * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending * outbound write requests. *
* All methods must be called by a transport implementation from an I/O thread, except the following ones: *
* Note that the returned array is reused and thus should not escape * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}. * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example. *
*/ public ByteBuffer[] nioBuffers() { return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE); } /** * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only. * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned * array and the total number of readable bytes of the NIO buffers respectively. ** Note that the returned array is reused and thus should not escape * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}. * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example. *
* @param maxCount The maximum amount of buffers that will be added to the return value. * @param maxBytes A hint toward the maximum number of bytes to include as part of the return value. Note that this * value maybe exceeded because we make a best effort to include at least 1 {@link ByteBuffer} * in the return value to ensure write progress is made. */ public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) { assert maxCount > 0; assert maxBytes > 0; long nioBufferSize = 0; int nioBufferCount = 0; final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); Entry entry = flushedEntry; while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { if (!entry.cancelled) { ByteBuf buf = (ByteBuf) entry.msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes > 0) { if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) { // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry // we stop populate the ByteBuffer array. This is done for 2 reasons: // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call // and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending // on the architecture and kernel but to be safe we also enforce the limit here. // 2. There is no sense in putting more data in the array than is likely to be accepted by the // OS. // // See also: // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2 // - http://linux.die.net/man/2/writev break; } nioBufferSize += readableBytes; int count = entry.count; if (count == -1) { //noinspection ConstantValueVariableUse entry.count = count = buf.nioBufferCount(); } int neededSpace = min(maxCount, nioBufferCount + count); if (neededSpace > nioBuffers.length) { nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); NIO_BUFFERS.set(threadLocalMap, nioBuffers); } if (count == 1) { ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a // derived buffer entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } nioBuffers[nioBufferCount++] = nioBuf; } else { // The code exists in an extra method to ensure the method is not too big to inline as this // branch is not very likely to get hit very frequently. nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount); } if (nioBufferCount == maxCount) { break; } } } entry = entry.next; } this.nioBufferCount = nioBufferCount; this.nioBufferSize = nioBufferSize; return nioBuffers; } private static int nioBuffers(Entry entry, ByteBuf buf, ByteBuffer[] nioBuffers, int nioBufferCount, int maxCount) { ByteBuffer[] nioBufs = entry.bufs; if (nioBufs == null) { // cached ByteBuffers as they may be expensive to create in terms // of Object allocation entry.bufs = nioBufs = buf.nioBuffers(); } for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) { ByteBuffer nioBuf = nioBufs[i]; if (nioBuf == null) { break; } else if (!nioBuf.hasRemaining()) { continue; } nioBuffers[nioBufferCount++] = nioBuf; } return nioBufferCount; } private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { int newCapacity = array.length; do { // double capacity until it is big enough // See https://github.com/netty/netty/issues/1890 newCapacity <<= 1; if (newCapacity < 0) { throw new IllegalStateException(); } } while (neededSpace > newCapacity); ByteBuffer[] newArray = new ByteBuffer[newCapacity]; System.arraycopy(array, 0, newArray, 0, size); return newArray; } /** * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was * obtained via {@link #nioBuffers()}. This method MUST be called after {@link #nioBuffers()} * was called. */ public int nioBufferCount() { return nioBufferCount; } /** * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was * obtained via {@link #nioBuffers()}. This method MUST be called after {@link #nioBuffers()} * was called. */ public long nioBufferSize() { return nioBufferSize; } /** * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did * not exceed the write watermark of the {@link Channel} and * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to * {@code false}. */ public boolean isWritable() { return unwritable == 0; } /** * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to * {@code true}. */ public boolean getUserDefinedWritability(int index) { return (unwritable & writabilityMask(index)) == 0; } /** * Sets a user-defined writability flag at the specified index. */ public void setUserDefinedWritability(int index, boolean writable) { if (writable) { setUserDefinedWritability(index); } else { clearUserDefinedWritability(index); } } private void setUserDefinedWritability(int index) { final int mask = ~writabilityMask(index); for (;;) { final int oldValue = unwritable; final int newValue = oldValue & mask; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue != 0 && newValue == 0) { fireChannelWritabilityChanged(true); } break; } } } private void clearUserDefinedWritability(int index) { final int mask = writabilityMask(index); for (;;) { final int oldValue = unwritable; final int newValue = oldValue | mask; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0 && newValue != 0) { fireChannelWritabilityChanged(true); } break; } } } private static int writabilityMask(int index) { if (index < 1 || index > 31) { throw new IllegalArgumentException("index: " + index + " (expected: 1~31)"); } return 1 << index; } private void setWritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue & ~1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue != 0 && newValue == 0) { fireChannelWritabilityChanged(invokeLater); } break; } } } private void setUnwritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue | 1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0 && newValue != 0) { fireChannelWritabilityChanged(invokeLater); } break; } } } private void fireChannelWritabilityChanged(boolean invokeLater) { final ChannelPipeline pipeline = channel.pipeline(); if (invokeLater) { Runnable task = fireChannelWritabilityChangedTask; if (task == null) { fireChannelWritabilityChangedTask = task = new Runnable() { @Override public void run() { pipeline.fireChannelWritabilityChanged(); } }; } channel.eventLoop().execute(task); } else { pipeline.fireChannelWritabilityChanged(); } } /** * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}. */ public int size() { return flushed; } /** * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false} * otherwise. */ public boolean isEmpty() { return flushed == 0; } void failFlushed(Throwable cause, boolean notify) { // Make sure that this method does not reenter. A listener added to the current promise can be notified by the // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call // indirectly (usually by closing the channel.) // // See https://github.com/netty/netty/issues/1501 if (inFail) { return; } try { inFail = true; for (;;) { if (!remove0(cause, notify)) { break; } } } finally { inFail = false; } } void close(final Throwable cause, final boolean allowChannelOpen) { if (inFail) { channel.eventLoop().execute(new Runnable() { @Override public void run() { close(cause, allowChannelOpen); } }); return; } inFail = true; if (!allowChannelOpen && channel.isOpen()) { throw new IllegalStateException("close() must be invoked after the channel is closed."); } if (!isEmpty()) { throw new IllegalStateException("close() must be invoked after all flushed writes are handled."); } // Release all unflushed messages. try { Entry e = unflushedEntry; while (e != null) { // Just decrease; do not trigger any events via decrementPendingOutboundBytes() int size = e.pendingSize; TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); if (!e.cancelled) { ReferenceCountUtil.safeRelease(e.msg); safeFail(e.promise, cause); } e = e.recycleAndGetNext(); } } finally { inFail = false; } clearNioBuffers(); } void close(ClosedChannelException cause) { close(cause, false); } private static void safeSuccess(ChannelPromise promise) { // Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return // false. PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger); } private static void safeFail(ChannelPromise promise, Throwable cause) { // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return // false. PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger); } @Deprecated public void recycle() { // NOOP } public long totalPendingWriteBytes() { return totalPendingSize; } /** * Get how many bytes can be written until {@link #isWritable()} returns {@code false}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0. */ public long bytesBeforeUnwritable() { long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize; // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability. // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized // together. totalPendingSize will be updated before isWritable(). if (bytes > 0) { return isWritable() ? bytes : 0; } return 0; } /** * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0. */ public long bytesBeforeWritable() { long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark(); // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability. // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized // together. totalPendingSize will be updated before isWritable(). if (bytes > 0) { return isWritable() ? 0 : bytes; } return 0; } /** * Call {@link MessageProcessor#processMessage(Object)} for each flushed message * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)} * returns {@code false} or there are no more flushed messages to process. */ public void forEachFlushedMessage(MessageProcessor processor) throws Exception { if (processor == null) { throw new NullPointerException("processor"); } Entry entry = flushedEntry; if (entry == null) { return; } do { if (!entry.cancelled) { if (!processor.processMessage(entry.msg)) { return; } } entry = entry.next; } while (isFlushedEntry(entry)); } private boolean isFlushedEntry(Entry e) { return e != null && e != unflushedEntry; } public interface MessageProcessor { /** * Will be called for each flushed message until it either there are no more flushed messages or this * method returns {@code false}. */ boolean processMessage(Object msg) throws Exception; } static final class Entry { private static final Recycler