/* * Copyright 2017 The Netty Project * * The Netty Project licenses this file to you under the Apache License, version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under * the License. */ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.ArrayDeque; import static io.netty.util.ReferenceCountUtil.safeRelease; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; import static io.netty.util.internal.PlatformDependent.throwException; @UnstableApi public abstract class AbstractCoalescingBufferQueue { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractCoalescingBufferQueue.class); private final ArrayDeque bufAndListenerPairs; private final PendingBytesTracker tracker; private int readableBytes; /** * Create a new instance. * * @param channel the {@link Channel} which will have the {@link Channel#isWritable()} reflect the amount of queued * buffers or {@code null} if there is no writability state updated. * @param initSize the initial size of the underlying queue. */ protected AbstractCoalescingBufferQueue(Channel channel, int initSize) { bufAndListenerPairs = new ArrayDeque(initSize); tracker = channel == null ? null : PendingBytesTracker.newTracker(channel); } /** * Add a buffer to the front of the queue and associate a promise with it that should be completed when * all the buffer's bytes have been consumed from the queue and written. * @param buf to add to the head of the queue * @param promise to complete when all the bytes have been consumed and written, can be void. */ public final void addFirst(ByteBuf buf, ChannelPromise promise) { addFirst(buf, toChannelFutureListener(promise)); } private void addFirst(ByteBuf buf, ChannelFutureListener listener) { if (listener != null) { bufAndListenerPairs.addFirst(listener); } bufAndListenerPairs.addFirst(buf); incrementReadableBytes(buf.readableBytes()); } /** * Add a buffer to the end of the queue. */ public final void add(ByteBuf buf) { add(buf, (ChannelFutureListener) null); } /** * Add a buffer to the end of the queue and associate a promise with it that should be completed when * all the buffer's bytes have been consumed from the queue and written. * @param buf to add to the tail of the queue * @param promise to complete when all the bytes have been consumed and written, can be void. */ public final void add(ByteBuf buf, ChannelPromise promise) { // buffers are added before promises so that we naturally 'consume' the entire buffer during removal // before we complete it's promise. add(buf, toChannelFutureListener(promise)); } /** * Add a buffer to the end of the queue and associate a listener with it that should be completed when * all the buffers bytes have been consumed from the queue and written. * @param buf to add to the tail of the queue * @param listener to notify when all the bytes have been consumed and written, can be {@code null}. */ public final void add(ByteBuf buf, ChannelFutureListener listener) { // buffers are added before promises so that we naturally 'consume' the entire buffer during removal // before we complete it's promise. bufAndListenerPairs.add(buf); if (listener != null) { bufAndListenerPairs.add(listener); } incrementReadableBytes(buf.readableBytes()); } /** * Remove the first {@link ByteBuf} from the queue. * @param aggregatePromise used to aggregate the promises and listeners for the returned buffer. * @return the first {@link ByteBuf} from the queue. */ public final ByteBuf removeFirst(ChannelPromise aggregatePromise) { Object entry = bufAndListenerPairs.poll(); if (entry == null) { return null; } assert entry instanceof ByteBuf; ByteBuf result = (ByteBuf) entry; decrementReadableBytes(result.readableBytes()); entry = bufAndListenerPairs.peek(); if (entry instanceof ChannelFutureListener) { aggregatePromise.addListener((ChannelFutureListener) entry); bufAndListenerPairs.poll(); } return result; } /** * Remove a {@link ByteBuf} from the queue with the specified number of bytes. Any added buffer who's bytes are * fully consumed during removal will have it's promise completed when the passed aggregate {@link ChannelPromise} * completes. * * @param alloc The allocator used if a new {@link ByteBuf} is generated during the aggregation process. * @param bytes the maximum number of readable bytes in the returned {@link ByteBuf}, if {@code bytes} is greater * than {@link #readableBytes} then a buffer of length {@link #readableBytes} is returned. * @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers. * @return a {@link ByteBuf} composed of the enqueued buffers. */ public final ByteBuf remove(ByteBufAllocator alloc, int bytes, ChannelPromise aggregatePromise) { checkPositiveOrZero(bytes, "bytes"); checkNotNull(aggregatePromise, "aggregatePromise"); // Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer. if (bufAndListenerPairs.isEmpty()) { return removeEmptyValue(); } bytes = Math.min(bytes, readableBytes); ByteBuf toReturn = null; ByteBuf entryBuffer = null; int originalBytes = bytes; try { for (;;) { Object entry = bufAndListenerPairs.poll(); if (entry == null) { break; } if (entry instanceof ChannelFutureListener) { aggregatePromise.addListener((ChannelFutureListener) entry); continue; } entryBuffer = (ByteBuf) entry; if (entryBuffer.readableBytes() > bytes) { // Add the buffer back to the queue as we can't consume all of it. bufAndListenerPairs.addFirst(entryBuffer); if (bytes > 0) { // Take a slice of what we can consume and retain it. entryBuffer = entryBuffer.readRetainedSlice(bytes); toReturn = toReturn == null ? composeFirst(alloc, entryBuffer) : compose(alloc, toReturn, entryBuffer); bytes = 0; } break; } else { bytes -= entryBuffer.readableBytes(); toReturn = toReturn == null ? composeFirst(alloc, entryBuffer) : compose(alloc, toReturn, entryBuffer); } entryBuffer = null; } } catch (Throwable cause) { safeRelease(entryBuffer); safeRelease(toReturn); aggregatePromise.setFailure(cause); throwException(cause); } decrementReadableBytes(originalBytes - bytes); return toReturn; } /** * The number of readable bytes. */ public final int readableBytes() { return readableBytes; } /** * Are there pending buffers in the queue. */ public final boolean isEmpty() { return bufAndListenerPairs.isEmpty(); } /** * Release all buffers in the queue and complete all listeners and promises. */ public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) { releaseAndCompleteAll(invoker.newFailedFuture(cause)); } /** * Copy all pending entries in this queue into the destination queue. * @param dest to copy pending buffers to. */ public final void copyTo(AbstractCoalescingBufferQueue dest) { dest.bufAndListenerPairs.addAll(bufAndListenerPairs); dest.incrementReadableBytes(readableBytes); } /** * Writes all remaining elements in this queue. * @param ctx The context to write all elements to. */ public final void writeAndRemoveAll(ChannelHandlerContext ctx) { decrementReadableBytes(readableBytes); Throwable pending = null; ByteBuf previousBuf = null; for (;;) { Object entry = bufAndListenerPairs.poll(); try { if (entry == null) { if (previousBuf != null) { ctx.write(previousBuf, ctx.voidPromise()); } break; } if (entry instanceof ByteBuf) { if (previousBuf != null) { ctx.write(previousBuf, ctx.voidPromise()); } previousBuf = (ByteBuf) entry; } else if (entry instanceof ChannelPromise) { ctx.write(previousBuf, (ChannelPromise) entry); previousBuf = null; } else { ctx.write(previousBuf).addListener((ChannelFutureListener) entry); previousBuf = null; } } catch (Throwable t) { if (pending == null) { pending = t; } else { logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t); } } } if (pending != null) { throw new IllegalStateException(pending); } } /** * Calculate the result of {@code current + next}. */ protected abstract ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next); /** * Compose {@code cumulation} and {@code next} into a new {@link CompositeByteBuf}. */ protected final ByteBuf composeIntoComposite(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) { // Create a composite buffer to accumulate this pair and potentially all the buffers // in the queue. Using +2 as we have already dequeued current and next. CompositeByteBuf composite = alloc.compositeBuffer(size() + 2); try { composite.addComponent(true, cumulation); composite.addComponent(true, next); } catch (Throwable cause) { composite.release(); safeRelease(next); throwException(cause); } return composite; } /** * Compose {@code cumulation} and {@code next} into a new {@link ByteBufAllocator#ioBuffer()}. * @param alloc The allocator to use to allocate the new buffer. * @param cumulation The current cumulation. * @param next The next buffer. * @return The result of {@code cumulation + next}. */ protected final ByteBuf copyAndCompose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) { ByteBuf newCumulation = alloc.ioBuffer(cumulation.readableBytes() + next.readableBytes()); try { newCumulation.writeBytes(cumulation).writeBytes(next); } catch (Throwable cause) { newCumulation.release(); safeRelease(next); throwException(cause); } cumulation.release(); next.release(); return newCumulation; } /** * Calculate the first {@link ByteBuf} which will be used in subsequent calls to * {@link #compose(ByteBufAllocator, ByteBuf, ByteBuf)}. */ protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) { return first; } /** * The value to return when {@link #remove(ByteBufAllocator, int, ChannelPromise)} is called but the queue is empty. * @return the {@link ByteBuf} which represents an empty queue. */ protected abstract ByteBuf removeEmptyValue(); /** * Get the number of elements in this queue added via one of the {@link #add(ByteBuf)} methods. * @return the number of elements in this queue. */ protected final int size() { return bufAndListenerPairs.size(); } private void releaseAndCompleteAll(ChannelFuture future) { decrementReadableBytes(readableBytes); Throwable pending = null; for (;;) { Object entry = bufAndListenerPairs.poll(); if (entry == null) { break; } try { if (entry instanceof ByteBuf) { safeRelease(entry); } else { ((ChannelFutureListener) entry).operationComplete(future); } } catch (Throwable t) { if (pending == null) { pending = t; } else { logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t); } } } if (pending != null) { throw new IllegalStateException(pending); } } private void incrementReadableBytes(int increment) { int nextReadableBytes = readableBytes + increment; if (nextReadableBytes < readableBytes) { throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment); } readableBytes = nextReadableBytes; if (tracker != null) { tracker.incrementPendingOutboundBytes(increment); } } private void decrementReadableBytes(int decrement) { readableBytes -= decrement; assert readableBytes >= 0; if (tracker != null) { tracker.decrementPendingOutboundBytes(decrement); } } private static ChannelFutureListener toChannelFutureListener(ChannelPromise promise) { return promise.isVoid() ? null : new DelegatingChannelPromiseNotifier(promise); } }