Allow to create PendingWriteQueue via a Channel (#11055)

Motivation:

At the moment its only possible to create a PendingWriteQueue via a ChannelHandlerContext.

Modifications:

Add another constructor

Result:

More flexible usage of PendingWriteQueue
This commit is contained in:
Norman Maurer 2021-03-03 16:49:17 +01:00 committed by GitHub
parent 18c66a9d70
commit 8872c1454a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -16,6 +16,7 @@
package io.netty.channel; package io.netty.channel;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.PromiseCombiner; import io.netty.util.concurrent.PromiseCombiner;
import io.netty.util.internal.ObjectPool; import io.netty.util.internal.ObjectPool;
import io.netty.util.internal.ObjectPool.ObjectCreator; import io.netty.util.internal.ObjectPool.ObjectCreator;
@ -38,7 +39,8 @@ public final class PendingWriteQueue {
private static final int PENDING_WRITE_OVERHEAD = private static final int PENDING_WRITE_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64); SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64);
private final ChannelHandlerContext ctx; private final ChannelOutboundInvoker invoker;
private final EventExecutor executor;
private final PendingBytesTracker tracker; private final PendingBytesTracker tracker;
// head and tail pointers for the linked-list structure. If empty head and tail are null. // head and tail pointers for the linked-list structure. If empty head and tail are null.
@ -49,14 +51,21 @@ public final class PendingWriteQueue {
public PendingWriteQueue(ChannelHandlerContext ctx) { public PendingWriteQueue(ChannelHandlerContext ctx) {
tracker = PendingBytesTracker.newTracker(ctx.channel()); tracker = PendingBytesTracker.newTracker(ctx.channel());
this.ctx = ctx; this.invoker = ctx;
this.executor = ctx.executor();
}
public PendingWriteQueue(Channel channel) {
tracker = PendingBytesTracker.newTracker(channel);
this.invoker = channel;
this.executor = channel.eventLoop();
} }
/** /**
* Returns {@code true} if there are no pending write operations left in this queue. * Returns {@code true} if there are no pending write operations left in this queue.
*/ */
public boolean isEmpty() { public boolean isEmpty() {
assert ctx.executor().inEventLoop(); assert executor.inEventLoop();
return head == null; return head == null;
} }
@ -64,7 +73,7 @@ public final class PendingWriteQueue {
* Returns the number of pending write operations. * Returns the number of pending write operations.
*/ */
public int size() { public int size() {
assert ctx.executor().inEventLoop(); assert executor.inEventLoop();
return size; return size;
} }
@ -73,7 +82,7 @@ public final class PendingWriteQueue {
* it should only be treated as a hint. * it should only be treated as a hint.
*/ */
public long bytes() { public long bytes() {
assert ctx.executor().inEventLoop(); assert executor.inEventLoop();
return bytes; return bytes;
} }
@ -92,7 +101,7 @@ public final class PendingWriteQueue {
* Add the given {@code msg} and {@link ChannelPromise}. * Add the given {@code msg} and {@link ChannelPromise}.
*/ */
public void add(Object msg, ChannelPromise promise) { public void add(Object msg, ChannelPromise promise) {
assert ctx.executor().inEventLoop(); assert executor.inEventLoop();
ObjectUtil.checkNotNull(msg, "msg"); ObjectUtil.checkNotNull(msg, "msg");
ObjectUtil.checkNotNull(promise, "promise"); ObjectUtil.checkNotNull(promise, "promise");
// It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering, // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
@ -120,14 +129,14 @@ public final class PendingWriteQueue {
* if the {@link PendingWriteQueue} is empty. * if the {@link PendingWriteQueue} is empty.
*/ */
public ChannelFuture removeAndWriteAll() { public ChannelFuture removeAndWriteAll() {
assert ctx.executor().inEventLoop(); assert executor.inEventLoop();
if (isEmpty()) { if (isEmpty()) {
return null; return null;
} }
ChannelPromise p = ctx.newPromise(); ChannelPromise p = invoker.newPromise();
PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); PromiseCombiner combiner = new PromiseCombiner(executor);
try { try {
// It is possible for some of the written promises to trigger more writes. The new writes // It is possible for some of the written promises to trigger more writes. The new writes
// will "revive" the queue, so we need to write them up until the queue is empty. // will "revive" the queue, so we need to write them up until the queue is empty.
@ -144,7 +153,7 @@ public final class PendingWriteQueue {
if (!(promise instanceof VoidChannelPromise)) { if (!(promise instanceof VoidChannelPromise)) {
combiner.add(promise); combiner.add(promise);
} }
ctx.write(msg, promise); invoker.write(msg, promise);
write = next; write = next;
} }
} }
@ -161,7 +170,7 @@ public final class PendingWriteQueue {
* via {@link ReferenceCountUtil#safeRelease(Object)}. * via {@link ReferenceCountUtil#safeRelease(Object)}.
*/ */
public void removeAndFailAll(Throwable cause) { public void removeAndFailAll(Throwable cause) {
assert ctx.executor().inEventLoop(); assert executor.inEventLoop();
ObjectUtil.checkNotNull(cause, "cause"); ObjectUtil.checkNotNull(cause, "cause");
// It is possible for some of the failed promises to trigger more writes. The new writes // It is possible for some of the failed promises to trigger more writes. The new writes
// will "revive" the queue, so we need to clean them up until the queue is empty. // will "revive" the queue, so we need to clean them up until the queue is empty.
@ -186,7 +195,7 @@ public final class PendingWriteQueue {
* {@link ReferenceCountUtil#safeRelease(Object)}. * {@link ReferenceCountUtil#safeRelease(Object)}.
*/ */
public void removeAndFail(Throwable cause) { public void removeAndFail(Throwable cause) {
assert ctx.executor().inEventLoop(); assert executor.inEventLoop();
ObjectUtil.checkNotNull(cause, "cause"); ObjectUtil.checkNotNull(cause, "cause");
PendingWrite write = head; PendingWrite write = head;
@ -211,7 +220,7 @@ public final class PendingWriteQueue {
* if the {@link PendingWriteQueue} is empty. * if the {@link PendingWriteQueue} is empty.
*/ */
public ChannelFuture removeAndWrite() { public ChannelFuture removeAndWrite() {
assert ctx.executor().inEventLoop(); assert executor.inEventLoop();
PendingWrite write = head; PendingWrite write = head;
if (write == null) { if (write == null) {
return null; return null;
@ -219,7 +228,7 @@ public final class PendingWriteQueue {
Object msg = write.msg; Object msg = write.msg;
ChannelPromise promise = write.promise; ChannelPromise promise = write.promise;
recycle(write, true); recycle(write, true);
return ctx.write(msg, promise); return invoker.write(msg, promise);
} }
/** /**
@ -229,7 +238,7 @@ public final class PendingWriteQueue {
* *
*/ */
public ChannelPromise remove() { public ChannelPromise remove() {
assert ctx.executor().inEventLoop(); assert executor.inEventLoop();
PendingWrite write = head; PendingWrite write = head;
if (write == null) { if (write == null) {
return null; return null;
@ -244,7 +253,7 @@ public final class PendingWriteQueue {
* Return the current message or {@code null} if empty. * Return the current message or {@code null} if empty.
*/ */
public Object current() { public Object current() {
assert ctx.executor().inEventLoop(); assert executor.inEventLoop();
PendingWrite write = head; PendingWrite write = head;
if (write == null) { if (write == null) {
return null; return null;