/* * Copyright 2013 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; import io.netty.buffer.Unpooled; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; /** * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending * outbound write requests. *
* All methods must be called by a transport implementation from an I/O thread, except the following ones: *
* Note that the returned array is reused and thus should not escape * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}. * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example. *
*/ public ByteBuffer[] nioBuffers() { long nioBufferSize = 0; int nioBufferCount = 0; final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); Entry entry = flushedEntry; while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { if (!entry.cancelled) { ByteBuf buf = (ByteBuf) entry.msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes > 0) { nioBufferSize += readableBytes; int count = entry.count; if (count == -1) { //noinspection ConstantValueVariableUse entry.count = count = buf.nioBufferCount(); } int neededSpace = nioBufferCount + count; if (neededSpace > nioBuffers.length) { nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); NIO_BUFFERS.set(threadLocalMap, nioBuffers); } if (count == 1) { ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a // derived buffer entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } nioBuffers[nioBufferCount ++] = nioBuf; } else { ByteBuffer[] nioBufs = entry.bufs; if (nioBufs == null) { // cached ByteBuffers as they may be expensive to create in terms // of Object allocation entry.bufs = nioBufs = buf.nioBuffers(); } nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); } } } entry = entry.next; } this.nioBufferCount = nioBufferCount; this.nioBufferSize = nioBufferSize; return nioBuffers; } private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) { for (ByteBuffer nioBuf: nioBufs) { if (nioBuf == null) { break; } nioBuffers[nioBufferCount ++] = nioBuf; } return nioBufferCount; } private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { int newCapacity = array.length; do { // double capacity until it is big enough // See https://github.com/netty/netty/issues/1890 newCapacity <<= 1; if (newCapacity < 0) { throw new IllegalStateException(); } } while (neededSpace > newCapacity); ByteBuffer[] newArray = new ByteBuffer[newCapacity]; System.arraycopy(array, 0, newArray, 0, size); return newArray; } /** * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was * obtained via {@link #nioBuffers()}. This method MUST be called after {@link #nioBuffers()} * was called. */ public int nioBufferCount() { return nioBufferCount; } /** * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was * obtained via {@link #nioBuffers()}. This method MUST be called after {@link #nioBuffers()} * was called. */ public long nioBufferSize() { return nioBufferSize; } /** * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did * not exceed the write watermark of the {@link Channel} and * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to * {@code false}. */ public boolean isWritable() { return unwritable == 0; } /** * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to * {@code true}. */ public boolean getUserDefinedWritability(int index) { return (unwritable & writabilityMask(index)) == 0; } /** * Sets a user-defined writability flag at the specified index. */ public void setUserDefinedWritability(int index, boolean writable) { if (writable) { setUserDefinedWritability(index); } else { clearUserDefinedWritability(index); } } private void setUserDefinedWritability(int index) { final int mask = ~writabilityMask(index); for (;;) { final int oldValue = unwritable; final int newValue = oldValue & mask; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue != 0 && newValue == 0) { fireChannelWritabilityChanged(true); } break; } } } private void clearUserDefinedWritability(int index) { final int mask = writabilityMask(index); for (;;) { final int oldValue = unwritable; final int newValue = oldValue | mask; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0 && newValue != 0) { fireChannelWritabilityChanged(true); } break; } } } private static int writabilityMask(int index) { if (index < 1 || index > 31) { throw new IllegalArgumentException("index: " + index + " (expected: 1~31)"); } return 1 << index; } private void setWritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue & ~1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue != 0 && newValue == 0) { fireChannelWritabilityChanged(invokeLater); } break; } } } private void setUnwritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue | 1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0 && newValue != 0) { fireChannelWritabilityChanged(invokeLater); } break; } } } private void fireChannelWritabilityChanged(boolean invokeLater) { final ChannelPipeline pipeline = channel.pipeline(); if (invokeLater) { Runnable task = fireChannelWritabilityChangedTask; if (task == null) { fireChannelWritabilityChangedTask = task = new Runnable() { @Override public void run() { pipeline.fireChannelWritabilityChanged(); } }; } channel.eventLoop().execute(task); } else { pipeline.fireChannelWritabilityChanged(); } } /** * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}. */ public int size() { return flushed; } /** * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false} * otherwise. */ public boolean isEmpty() { return flushed == 0; } void failFlushed(Throwable cause, boolean notify) { // Make sure that this method does not reenter. A listener added to the current promise can be notified by the // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call // indirectly (usually by closing the channel.) // // See https://github.com/netty/netty/issues/1501 if (inFail) { return; } try { inFail = true; for (;;) { if (!remove0(cause, notify)) { break; } } } finally { inFail = false; } } void close(final ClosedChannelException cause) { if (inFail) { channel.eventLoop().execute(new Runnable() { @Override public void run() { close(cause); } }); return; } inFail = true; if (channel.isOpen()) { throw new IllegalStateException("close() must be invoked after the channel is closed."); } if (!isEmpty()) { throw new IllegalStateException("close() must be invoked after all flushed writes are handled."); } // Release all unflushed messages. try { Entry e = unflushedEntry; while (e != null) { // Just decrease; do not trigger any events via decrementPendingOutboundBytes() int size = e.pendingSize; TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); if (!e.cancelled) { ReferenceCountUtil.safeRelease(e.msg); safeFail(e.promise, cause); } e = e.recycleAndGetNext(); } } finally { inFail = false; } } private static void safeSuccess(ChannelPromise promise) { if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) { logger.warn("Failed to mark a promise as success because it is done already: {}", promise); } } private static void safeFail(ChannelPromise promise, Throwable cause) { if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); } } @Deprecated public void recycle() { // NOOP } public long totalPendingWriteBytes() { return totalPendingSize; } /** * Call {@link MessageProcessor#processMessage(Object)} for each flushed message * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)} * returns {@code false} or there are no more flushed messages to process. */ public void forEachFlushedMessage(MessageProcessor processor) throws Exception { if (processor == null) { throw new NullPointerException("processor"); } Entry entry = flushedEntry; if (entry == null) { return; } do { if (!entry.cancelled) { if (!processor.processMessage(entry.msg)) { return; } } entry = entry.next; } while (isFlushedEntry(entry)); } private boolean isFlushedEntry(Entry e) { return e != null && e != unflushedEntry; } public interface MessageProcessor { /** * Will be called for each flushed message until it either there are no more flushed messages or this * method returns {@code false}. */ boolean processMessage(Object msg) throws Exception; } static final class Entry { private static final Recycler