From b37a41a535e41b573c09a00226865561327fb01a Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 8 Jul 2016 10:22:11 +0200 Subject: [PATCH] Allow to get the number of bytes queued in PendingWriteQueue Motivation: For some use-cases it would be useful to know the number of bytes queued in the PendingWriteQueue without the need to dequeue them. Modifications: Add PendingWriteQueue.bytes(). Result: Be able to get the number of bytes queued. --- .../io/netty/channel/PendingWriteQueue.java | 18 ++++++++++++++++-- .../netty/channel/PendingWriteQueueTest.java | 1 + 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/PendingWriteQueue.java b/transport/src/main/java/io/netty/channel/PendingWriteQueue.java index 40566f249f..f148ba8c97 100644 --- a/transport/src/main/java/io/netty/channel/PendingWriteQueue.java +++ b/transport/src/main/java/io/netty/channel/PendingWriteQueue.java @@ -37,6 +37,7 @@ public final class PendingWriteQueue { private PendingWrite head; private PendingWrite tail; private int size; + private long bytes; public PendingWriteQueue(ChannelHandlerContext ctx) { if (ctx == null) { @@ -63,6 +64,15 @@ public final class PendingWriteQueue { return size; } + /** + * Returns the total number of bytes that are pending because of pending messages. This is only an estimate so + * it should only be treated as a hint. + */ + public long bytes() { + assert ctx.executor().inEventLoop(); + return bytes; + } + /** * Add the given {@code msg} and {@link ChannelPromise}. */ @@ -90,6 +100,7 @@ public final class PendingWriteQueue { tail = write; } size ++; + bytes += messageSize; // We need to guard against null as channel.unsafe().outboundBuffer() may returned null // if the channel was already closed when constructing the PendingWriteQueue. // See https://github.com/netty/netty/issues/3967 @@ -112,7 +123,7 @@ public final class PendingWriteQueue { for (PendingWrite write = head; write != null; write = head) { head = tail = null; size = 0; - + bytes = 0; while (write != null) { PendingWrite next = write.next; ReferenceCountUtil.safeRelease(write.msg); @@ -168,6 +179,7 @@ public final class PendingWriteQueue { // Guard against re-entrance by directly reset head = tail = null; size = 0; + bytes = 0; ChannelPromise p = ctx.newPromise(); PromiseCombiner combiner = new PromiseCombiner(); @@ -252,10 +264,12 @@ public final class PendingWriteQueue { // Guard against re-entrance by directly reset head = tail = null; size = 0; + bytes = 0; } else { head = next; size --; - assert size > 0; + bytes -= writeSize; + assert size > 0 && bytes >= 0; } } diff --git a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java index 03fbd1c37d..5d7f53fc2d 100644 --- a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java +++ b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java @@ -172,6 +172,7 @@ public class PendingWriteQueueTest { private static void assertQueueEmpty(PendingWriteQueue queue) { assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); + assertEquals(0, queue.bytes()); assertNull(queue.current()); assertNull(queue.removeAndWrite()); assertNull(queue.removeAndWriteAll());