diff --git a/transport/src/main/java/io/netty/channel/PendingWriteQueue.java b/transport/src/main/java/io/netty/channel/PendingWriteQueue.java index da1241f395..20d0e0dd6d 100644 --- a/transport/src/main/java/io/netty/channel/PendingWriteQueue.java +++ b/transport/src/main/java/io/netty/channel/PendingWriteQueue.java @@ -16,6 +16,7 @@ package io.netty.channel; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.PromiseCombiner; import io.netty.util.internal.ObjectPool; import io.netty.util.internal.ObjectPool.ObjectCreator; @@ -38,7 +39,8 @@ public final class PendingWriteQueue { private static final int PENDING_WRITE_OVERHEAD = SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64); - private final ChannelHandlerContext ctx; + private final ChannelOutboundInvoker invoker; + private final EventExecutor executor; private final PendingBytesTracker tracker; // head and tail pointers for the linked-list structure. If empty head and tail are null. @@ -49,14 +51,21 @@ public final class PendingWriteQueue { public PendingWriteQueue(ChannelHandlerContext ctx) { tracker = PendingBytesTracker.newTracker(ctx.channel()); - this.ctx = ctx; + this.invoker = ctx; + this.executor = ctx.executor(); + } + + public PendingWriteQueue(Channel channel) { + tracker = PendingBytesTracker.newTracker(channel); + this.invoker = channel; + this.executor = channel.eventLoop(); } /** * Returns {@code true} if there are no pending write operations left in this queue. */ public boolean isEmpty() { - assert ctx.executor().inEventLoop(); + assert executor.inEventLoop(); return head == null; } @@ -64,7 +73,7 @@ public final class PendingWriteQueue { * Returns the number of pending write operations. */ public int size() { - assert ctx.executor().inEventLoop(); + assert executor.inEventLoop(); return size; } @@ -73,7 +82,7 @@ public final class PendingWriteQueue { * it should only be treated as a hint. */ public long bytes() { - assert ctx.executor().inEventLoop(); + assert executor.inEventLoop(); return bytes; } @@ -92,7 +101,7 @@ public final class PendingWriteQueue { * Add the given {@code msg} and {@link ChannelPromise}. */ public void add(Object msg, ChannelPromise promise) { - assert ctx.executor().inEventLoop(); + assert executor.inEventLoop(); ObjectUtil.checkNotNull(msg, "msg"); ObjectUtil.checkNotNull(promise, "promise"); // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering, @@ -120,14 +129,14 @@ public final class PendingWriteQueue { * if the {@link PendingWriteQueue} is empty. */ public ChannelFuture removeAndWriteAll() { - assert ctx.executor().inEventLoop(); + assert executor.inEventLoop(); if (isEmpty()) { return null; } - ChannelPromise p = ctx.newPromise(); - PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); + ChannelPromise p = invoker.newPromise(); + PromiseCombiner combiner = new PromiseCombiner(executor); try { // It is possible for some of the written promises to trigger more writes. The new writes // will "revive" the queue, so we need to write them up until the queue is empty. @@ -144,7 +153,7 @@ public final class PendingWriteQueue { if (!(promise instanceof VoidChannelPromise)) { combiner.add(promise); } - ctx.write(msg, promise); + invoker.write(msg, promise); write = next; } } @@ -161,7 +170,7 @@ public final class PendingWriteQueue { * via {@link ReferenceCountUtil#safeRelease(Object)}. */ public void removeAndFailAll(Throwable cause) { - assert ctx.executor().inEventLoop(); + assert executor.inEventLoop(); ObjectUtil.checkNotNull(cause, "cause"); // It is possible for some of the failed promises to trigger more writes. The new writes // will "revive" the queue, so we need to clean them up until the queue is empty. @@ -186,7 +195,7 @@ public final class PendingWriteQueue { * {@link ReferenceCountUtil#safeRelease(Object)}. */ public void removeAndFail(Throwable cause) { - assert ctx.executor().inEventLoop(); + assert executor.inEventLoop(); ObjectUtil.checkNotNull(cause, "cause"); PendingWrite write = head; @@ -211,7 +220,7 @@ public final class PendingWriteQueue { * if the {@link PendingWriteQueue} is empty. */ public ChannelFuture removeAndWrite() { - assert ctx.executor().inEventLoop(); + assert executor.inEventLoop(); PendingWrite write = head; if (write == null) { return null; @@ -219,7 +228,7 @@ public final class PendingWriteQueue { Object msg = write.msg; ChannelPromise promise = write.promise; recycle(write, true); - return ctx.write(msg, promise); + return invoker.write(msg, promise); } /** @@ -229,7 +238,7 @@ public final class PendingWriteQueue { * */ public ChannelPromise remove() { - assert ctx.executor().inEventLoop(); + assert executor.inEventLoop(); PendingWrite write = head; if (write == null) { return null; @@ -244,7 +253,7 @@ public final class PendingWriteQueue { * Return the current message or {@code null} if empty. */ public Object current() { - assert ctx.executor().inEventLoop(); + assert executor.inEventLoop(); PendingWrite write = head; if (write == null) { return null;