Allow to get the number of bytes queued in PendingWriteQueue
Motivation: For some use-cases it would be useful to know the number of bytes queued in the PendingWriteQueue without the need to dequeue them. Modifications: Add PendingWriteQueue.bytes(). Result: Be able to get the number of bytes queued.
This commit is contained in:
parent
ce95c50444
commit
b37a41a535
@ -37,6 +37,7 @@ public final class PendingWriteQueue {
|
||||
private PendingWrite head;
|
||||
private PendingWrite tail;
|
||||
private int size;
|
||||
private long bytes;
|
||||
|
||||
public PendingWriteQueue(ChannelHandlerContext ctx) {
|
||||
if (ctx == null) {
|
||||
@ -63,6 +64,15 @@ public final class PendingWriteQueue {
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the total number of bytes that are pending because of pending messages. This is only an estimate so
|
||||
* it should only be treated as a hint.
|
||||
*/
|
||||
public long bytes() {
|
||||
assert ctx.executor().inEventLoop();
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the given {@code msg} and {@link ChannelPromise}.
|
||||
*/
|
||||
@ -90,6 +100,7 @@ public final class PendingWriteQueue {
|
||||
tail = write;
|
||||
}
|
||||
size ++;
|
||||
bytes += messageSize;
|
||||
// We need to guard against null as channel.unsafe().outboundBuffer() may returned null
|
||||
// if the channel was already closed when constructing the PendingWriteQueue.
|
||||
// See https://github.com/netty/netty/issues/3967
|
||||
@ -112,7 +123,7 @@ public final class PendingWriteQueue {
|
||||
for (PendingWrite write = head; write != null; write = head) {
|
||||
head = tail = null;
|
||||
size = 0;
|
||||
|
||||
bytes = 0;
|
||||
while (write != null) {
|
||||
PendingWrite next = write.next;
|
||||
ReferenceCountUtil.safeRelease(write.msg);
|
||||
@ -168,6 +179,7 @@ public final class PendingWriteQueue {
|
||||
// Guard against re-entrance by directly reset
|
||||
head = tail = null;
|
||||
size = 0;
|
||||
bytes = 0;
|
||||
|
||||
ChannelPromise p = ctx.newPromise();
|
||||
PromiseCombiner combiner = new PromiseCombiner();
|
||||
@ -252,10 +264,12 @@ public final class PendingWriteQueue {
|
||||
// Guard against re-entrance by directly reset
|
||||
head = tail = null;
|
||||
size = 0;
|
||||
bytes = 0;
|
||||
} else {
|
||||
head = next;
|
||||
size --;
|
||||
assert size > 0;
|
||||
bytes -= writeSize;
|
||||
assert size > 0 && bytes >= 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -172,6 +172,7 @@ public class PendingWriteQueueTest {
|
||||
private static void assertQueueEmpty(PendingWriteQueue queue) {
|
||||
assertTrue(queue.isEmpty());
|
||||
assertEquals(0, queue.size());
|
||||
assertEquals(0, queue.bytes());
|
||||
assertNull(queue.current());
|
||||
assertNull(queue.removeAndWrite());
|
||||
assertNull(queue.removeAndWriteAll());
|
||||
|
Loading…
Reference in New Issue
Block a user