2014-08-10 13:40:41 +02:00
|
|
|
/*
|
|
|
|
* Copyright 2014 The Netty Project
|
|
|
|
*
|
|
|
|
* The Netty Project licenses this file to you under the Apache License,
|
|
|
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
* with the License. You may obtain a copy of the License at:
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
* License for the specific language governing permissions and limitations
|
|
|
|
* under the License.
|
|
|
|
*/
|
|
|
|
package io.netty.channel;
|
|
|
|
|
|
|
|
import io.netty.util.Recycler;
|
|
|
|
import io.netty.util.ReferenceCountUtil;
|
2016-03-09 23:39:53 +01:00
|
|
|
import io.netty.util.concurrent.PromiseCombiner;
|
2017-08-11 07:51:45 +02:00
|
|
|
import io.netty.util.internal.ObjectUtil;
|
2016-10-13 14:50:09 +02:00
|
|
|
import io.netty.util.internal.SystemPropertyUtil;
|
2014-08-10 13:40:41 +02:00
|
|
|
import io.netty.util.internal.logging.InternalLogger;
|
|
|
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* A queue of write operations which are pending for later execution. It also updates the
|
|
|
|
* {@linkplain Channel#isWritable() writability} of the associated {@link Channel}, so that
|
|
|
|
* the pending write operations are also considered to determine the writability.
|
|
|
|
*/
|
|
|
|
public final class PendingWriteQueue {
|
|
|
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
|
2016-10-13 14:50:09 +02:00
|
|
|
// Assuming a 64-bit JVM:
|
|
|
|
// - 16 bytes object header
|
|
|
|
// - 4 reference fields
|
|
|
|
// - 1 long fields
|
|
|
|
private static final int PENDING_WRITE_OVERHEAD =
|
|
|
|
SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64);
|
2014-08-10 13:40:41 +02:00
|
|
|
|
|
|
|
private final ChannelHandlerContext ctx;
|
2017-10-21 14:30:16 +02:00
|
|
|
private final PendingBytesTracker tracker;
|
2014-08-10 13:40:41 +02:00
|
|
|
|
|
|
|
// head and tail pointers for the linked-list structure. If empty head and tail are null.
|
|
|
|
private PendingWrite head;
|
|
|
|
private PendingWrite tail;
|
|
|
|
private int size;
|
2016-07-08 10:22:11 +02:00
|
|
|
private long bytes;
|
2014-08-10 13:40:41 +02:00
|
|
|
|
|
|
|
public PendingWriteQueue(ChannelHandlerContext ctx) {
|
2017-10-21 14:30:16 +02:00
|
|
|
tracker = PendingBytesTracker.newTracker(ctx.channel());
|
|
|
|
this.ctx = ctx;
|
2014-08-10 13:40:41 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Returns {@code true} if there are no pending write operations left in this queue.
|
|
|
|
*/
|
|
|
|
public boolean isEmpty() {
|
|
|
|
assert ctx.executor().inEventLoop();
|
|
|
|
return head == null;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Returns the number of pending write operations.
|
|
|
|
*/
|
|
|
|
public int size() {
|
|
|
|
assert ctx.executor().inEventLoop();
|
|
|
|
return size;
|
|
|
|
}
|
|
|
|
|
2016-07-08 10:22:11 +02:00
|
|
|
/**
|
|
|
|
* Returns the total number of bytes that are pending because of pending messages. This is only an estimate so
|
|
|
|
* it should only be treated as a hint.
|
|
|
|
*/
|
|
|
|
public long bytes() {
|
|
|
|
assert ctx.executor().inEventLoop();
|
|
|
|
return bytes;
|
|
|
|
}
|
|
|
|
|
2016-10-13 14:50:09 +02:00
|
|
|
private int size(Object msg) {
|
|
|
|
// It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
|
|
|
|
// we should add them to the queue and let removeAndFailAll() fail them later.
|
2017-08-11 07:51:45 +02:00
|
|
|
int messageSize = tracker.size(msg);
|
2016-10-13 14:50:09 +02:00
|
|
|
if (messageSize < 0) {
|
2017-04-19 22:37:03 +02:00
|
|
|
// Size may be unknown so just use 0
|
2016-10-13 14:50:09 +02:00
|
|
|
messageSize = 0;
|
|
|
|
}
|
|
|
|
return messageSize + PENDING_WRITE_OVERHEAD;
|
|
|
|
}
|
|
|
|
|
2014-08-10 13:40:41 +02:00
|
|
|
/**
|
|
|
|
* Add the given {@code msg} and {@link ChannelPromise}.
|
|
|
|
*/
|
|
|
|
public void add(Object msg, ChannelPromise promise) {
|
|
|
|
assert ctx.executor().inEventLoop();
|
|
|
|
if (msg == null) {
|
|
|
|
throw new NullPointerException("msg");
|
|
|
|
}
|
|
|
|
if (promise == null) {
|
|
|
|
throw new NullPointerException("promise");
|
|
|
|
}
|
2016-03-08 00:58:38 +01:00
|
|
|
// It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
|
|
|
|
// we should add them to the queue and let removeAndFailAll() fail them later.
|
2016-10-13 14:50:09 +02:00
|
|
|
int messageSize = size(msg);
|
|
|
|
|
2014-08-10 13:40:41 +02:00
|
|
|
PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
|
|
|
|
PendingWrite currentTail = tail;
|
|
|
|
if (currentTail == null) {
|
|
|
|
tail = head = write;
|
|
|
|
} else {
|
|
|
|
currentTail.next = write;
|
|
|
|
tail = write;
|
|
|
|
}
|
|
|
|
size ++;
|
2016-07-08 10:22:11 +02:00
|
|
|
bytes += messageSize;
|
2017-08-11 07:51:45 +02:00
|
|
|
tracker.incrementPendingOutboundBytes(write.size);
|
2014-08-10 13:40:41 +02:00
|
|
|
}
|
|
|
|
|
2016-08-12 08:53:39 +02:00
|
|
|
/**
|
|
|
|
* Remove all pending write operation and performs them via
|
|
|
|
* {@link ChannelHandlerContext#write(Object, ChannelPromise)}.
|
|
|
|
*
|
|
|
|
* @return {@link ChannelFuture} if something was written and {@code null}
|
|
|
|
* if the {@link PendingWriteQueue} is empty.
|
|
|
|
*/
|
|
|
|
public ChannelFuture removeAndWriteAll() {
|
|
|
|
assert ctx.executor().inEventLoop();
|
|
|
|
|
|
|
|
if (isEmpty()) {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
|
|
|
|
ChannelPromise p = ctx.newPromise();
|
2019-02-28 20:32:04 +01:00
|
|
|
PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
|
2016-08-12 08:53:39 +02:00
|
|
|
try {
|
|
|
|
// It is possible for some of the written promises to trigger more writes. The new writes
|
|
|
|
// will "revive" the queue, so we need to write them up until the queue is empty.
|
|
|
|
for (PendingWrite write = head; write != null; write = head) {
|
|
|
|
head = tail = null;
|
|
|
|
size = 0;
|
|
|
|
bytes = 0;
|
|
|
|
|
|
|
|
while (write != null) {
|
|
|
|
PendingWrite next = write.next;
|
|
|
|
Object msg = write.msg;
|
|
|
|
ChannelPromise promise = write.promise;
|
|
|
|
recycle(write, false);
|
2018-03-14 19:08:24 +01:00
|
|
|
if (!(promise instanceof VoidChannelPromise)) {
|
|
|
|
combiner.add(promise);
|
|
|
|
}
|
2016-08-12 08:53:39 +02:00
|
|
|
ctx.write(msg, promise);
|
|
|
|
write = next;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
combiner.finish(p);
|
|
|
|
} catch (Throwable cause) {
|
|
|
|
p.setFailure(cause);
|
|
|
|
}
|
|
|
|
assertEmpty();
|
|
|
|
return p;
|
|
|
|
}
|
|
|
|
|
2014-08-10 13:40:41 +02:00
|
|
|
/**
|
|
|
|
* Remove all pending write operation and fail them with the given {@link Throwable}. The message will be released
|
|
|
|
* via {@link ReferenceCountUtil#safeRelease(Object)}.
|
|
|
|
*/
|
|
|
|
public void removeAndFailAll(Throwable cause) {
|
|
|
|
assert ctx.executor().inEventLoop();
|
|
|
|
if (cause == null) {
|
|
|
|
throw new NullPointerException("cause");
|
|
|
|
}
|
2016-03-08 00:58:38 +01:00
|
|
|
// It is possible for some of the failed promises to trigger more writes. The new writes
|
|
|
|
// will "revive" the queue, so we need to clean them up until the queue is empty.
|
|
|
|
for (PendingWrite write = head; write != null; write = head) {
|
|
|
|
head = tail = null;
|
|
|
|
size = 0;
|
2016-07-08 10:22:11 +02:00
|
|
|
bytes = 0;
|
2016-03-08 00:58:38 +01:00
|
|
|
while (write != null) {
|
|
|
|
PendingWrite next = write.next;
|
|
|
|
ReferenceCountUtil.safeRelease(write.msg);
|
|
|
|
ChannelPromise promise = write.promise;
|
|
|
|
recycle(write, false);
|
|
|
|
safeFail(promise, cause);
|
|
|
|
write = next;
|
|
|
|
}
|
2014-08-10 13:40:41 +02:00
|
|
|
}
|
|
|
|
assertEmpty();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Remove a pending write operation and fail it with the given {@link Throwable}. The message will be released via
|
|
|
|
* {@link ReferenceCountUtil#safeRelease(Object)}.
|
|
|
|
*/
|
|
|
|
public void removeAndFail(Throwable cause) {
|
|
|
|
assert ctx.executor().inEventLoop();
|
|
|
|
if (cause == null) {
|
|
|
|
throw new NullPointerException("cause");
|
|
|
|
}
|
|
|
|
PendingWrite write = head;
|
2015-01-30 20:27:27 +01:00
|
|
|
|
2014-08-10 13:40:41 +02:00
|
|
|
if (write == null) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
ReferenceCountUtil.safeRelease(write.msg);
|
|
|
|
ChannelPromise promise = write.promise;
|
|
|
|
safeFail(promise, cause);
|
2015-01-30 20:27:27 +01:00
|
|
|
recycle(write, true);
|
2014-08-10 13:40:41 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
private void assertEmpty() {
|
|
|
|
assert tail == null && head == null && size == 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Removes a pending write operation and performs it via
|
|
|
|
* {@link ChannelHandlerContext#write(Object, ChannelPromise)}.
|
|
|
|
*
|
|
|
|
* @return {@link ChannelFuture} if something was written and {@code null}
|
|
|
|
* if the {@link PendingWriteQueue} is empty.
|
|
|
|
*/
|
|
|
|
public ChannelFuture removeAndWrite() {
|
|
|
|
assert ctx.executor().inEventLoop();
|
|
|
|
PendingWrite write = head;
|
|
|
|
if (write == null) {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
Object msg = write.msg;
|
|
|
|
ChannelPromise promise = write.promise;
|
2015-01-30 20:27:27 +01:00
|
|
|
recycle(write, true);
|
2014-08-10 13:40:41 +02:00
|
|
|
return ctx.write(msg, promise);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Removes a pending write operation and release it's message via {@link ReferenceCountUtil#safeRelease(Object)}.
|
|
|
|
*
|
|
|
|
* @return {@link ChannelPromise} of the pending write or {@code null} if the queue is empty.
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
public ChannelPromise remove() {
|
|
|
|
assert ctx.executor().inEventLoop();
|
|
|
|
PendingWrite write = head;
|
|
|
|
if (write == null) {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
ChannelPromise promise = write.promise;
|
|
|
|
ReferenceCountUtil.safeRelease(write.msg);
|
2015-01-30 20:27:27 +01:00
|
|
|
recycle(write, true);
|
2014-08-10 13:40:41 +02:00
|
|
|
return promise;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Return the current message or {@code null} if empty.
|
|
|
|
*/
|
|
|
|
public Object current() {
|
|
|
|
assert ctx.executor().inEventLoop();
|
|
|
|
PendingWrite write = head;
|
|
|
|
if (write == null) {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
return write.msg;
|
|
|
|
}
|
|
|
|
|
2015-01-30 20:27:27 +01:00
|
|
|
private void recycle(PendingWrite write, boolean update) {
|
2014-12-07 15:24:19 +01:00
|
|
|
final PendingWrite next = write.next;
|
|
|
|
final long writeSize = write.size;
|
2014-08-10 13:40:41 +02:00
|
|
|
|
2015-01-30 20:27:27 +01:00
|
|
|
if (update) {
|
|
|
|
if (next == null) {
|
|
|
|
// Handled last PendingWrite so rest head and tail
|
|
|
|
// Guard against re-entrance by directly reset
|
|
|
|
head = tail = null;
|
|
|
|
size = 0;
|
2016-07-08 10:22:11 +02:00
|
|
|
bytes = 0;
|
2015-01-30 20:27:27 +01:00
|
|
|
} else {
|
|
|
|
head = next;
|
|
|
|
size --;
|
2016-07-08 10:22:11 +02:00
|
|
|
bytes -= writeSize;
|
|
|
|
assert size > 0 && bytes >= 0;
|
2015-01-30 20:27:27 +01:00
|
|
|
}
|
2014-08-10 13:40:41 +02:00
|
|
|
}
|
2014-12-07 15:24:19 +01:00
|
|
|
|
|
|
|
write.recycle();
|
2017-08-11 07:51:45 +02:00
|
|
|
tracker.decrementPendingOutboundBytes(writeSize);
|
2014-08-10 13:40:41 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
private static void safeFail(ChannelPromise promise, Throwable cause) {
|
|
|
|
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
|
|
|
|
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Holds all meta-data and construct the linked-list structure.
|
|
|
|
*/
|
|
|
|
static final class PendingWrite {
|
|
|
|
private static final Recycler<PendingWrite> RECYCLER = new Recycler<PendingWrite>() {
|
|
|
|
@Override
|
2016-05-17 09:31:58 +02:00
|
|
|
protected PendingWrite newObject(Handle<PendingWrite> handle) {
|
2014-08-10 13:40:41 +02:00
|
|
|
return new PendingWrite(handle);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2016-05-17 09:31:58 +02:00
|
|
|
private final Recycler.Handle<PendingWrite> handle;
|
2014-08-10 13:40:41 +02:00
|
|
|
private PendingWrite next;
|
|
|
|
private long size;
|
|
|
|
private ChannelPromise promise;
|
|
|
|
private Object msg;
|
|
|
|
|
2016-05-17 09:31:58 +02:00
|
|
|
private PendingWrite(Recycler.Handle<PendingWrite> handle) {
|
2014-08-10 13:40:41 +02:00
|
|
|
this.handle = handle;
|
|
|
|
}
|
|
|
|
|
|
|
|
static PendingWrite newInstance(Object msg, int size, ChannelPromise promise) {
|
|
|
|
PendingWrite write = RECYCLER.get();
|
|
|
|
write.size = size;
|
|
|
|
write.msg = msg;
|
|
|
|
write.promise = promise;
|
|
|
|
return write;
|
|
|
|
}
|
|
|
|
|
|
|
|
private void recycle() {
|
|
|
|
size = 0;
|
|
|
|
next = null;
|
|
|
|
msg = null;
|
|
|
|
promise = null;
|
2016-05-17 09:31:58 +02:00
|
|
|
handle.recycle(this);
|
2014-08-10 13:40:41 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|