Optimize WriteTask recycling (#9800)

Motivation

AbstractChannelHandlerContext uses recyclable tasks when performing
writes from outside of the event loop. There's currently two distinct
classes WriteTask and WriteAndFlushTask used for executing writes versus
writeAndFlushes, and these are recycled in separate pools. However it is
straightforward to just have a single class / recycler pool, with a
flush flag.

Modifications

- Unify WriteTasks into a single class using the sign bit of the
existing size field to indicate whether a flush should be performed
- Use the new executor lazyExecute() method to lazily execute the
non-flush write tasks explicitly
- Change AbstractChannelHandlerContext#invokeWrite and
AbstractChannelHandlerContext#invokeWriteAndFlush from private to
package-private to avoid synthetic methods
- Correct the default object size estimate for WriteTask

Results

- Possibly improved reuse of recycled write tasks
- Fewer virtual method calls and shorter path lengths
- Less code
This commit is contained in:
Nick Hill 2019-11-23 11:45:35 -08:00 committed by Norman Maurer
parent 43252a6135
commit b413464091

View File

@ -495,7 +495,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}, promise, null, false);
}
return promise;
}
@ -539,7 +539,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}, promise, null, false);
}
return promise;
}
@ -578,7 +578,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
public void run() {
next.invokeDisconnect(promise);
}
}, promise, null);
}, promise, null, false);
}
return promise;
}
@ -612,7 +612,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
public void run() {
next.invokeClose(promise);
}
}, promise, null);
}, promise, null, false);
}
return promise;
@ -647,7 +647,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
public void run() {
next.invokeDeregister(promise);
}
}, promise, null);
}, promise, null, false);
}
return promise;
@ -706,7 +706,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
return promise;
}
private void invokeWrite(Object msg, ChannelPromise promise) {
void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
@ -733,7 +733,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null);
safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
}
return this;
@ -761,7 +761,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
return promise;
}
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
@ -794,14 +794,9 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
next.invokeWrite(m, promise);
}
} else {
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
if (!safeExecute(executor, task, promise, m)) {
// We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
@ -1009,9 +1004,14 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
return channel().hasAttr(key);
}
private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
private static boolean safeExecute(EventExecutor executor, Runnable runnable,
ChannelPromise promise, Object msg, boolean lazy) {
try {
executor.execute(runnable);
if (lazy && executor instanceof AbstractEventExecutor) {
((AbstractEventExecutor) executor).lazyExecute(runnable);
} else {
executor.execute(runnable);
}
return true;
} catch (Throwable cause) {
try {
@ -1035,28 +1035,41 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
}
abstract static class AbstractWriteTask implements Runnable {
static final class WriteTask implements Runnable {
private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
@Override
public WriteTask newObject(Handle<WriteTask> handle) {
return new WriteTask(handle);
}
});
static WriteTask newInstance(AbstractChannelHandlerContext ctx,
Object msg, ChannelPromise promise, boolean flush) {
WriteTask task = RECYCLER.get();
init(task, ctx, msg, promise, flush);
return task;
}
private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
// Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
// Assuming compressed oops, 12 bytes obj header, 4 ref fields and one int field
private static final int WRITE_TASK_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);
SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
private final Handle<AbstractWriteTask> handle;
private final Handle<WriteTask> handle;
private AbstractChannelHandlerContext ctx;
private Object msg;
private ChannelPromise promise;
private int size;
private int size; // sign bit controls flush
@SuppressWarnings("unchecked")
private AbstractWriteTask(Handle<? extends AbstractWriteTask> handle) {
this.handle = (Handle<AbstractWriteTask>) handle;
private WriteTask(Handle<? extends WriteTask> handle) {
this.handle = (Handle<WriteTask>) handle;
}
protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
Object msg, ChannelPromise promise) {
protected static void init(WriteTask task, AbstractChannelHandlerContext ctx,
Object msg, ChannelPromise promise, boolean flush) {
task.ctx = ctx;
task.msg = msg;
task.promise = promise;
@ -1067,13 +1080,20 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
} else {
task.size = 0;
}
if (flush) {
task.size |= Integer.MIN_VALUE;
}
}
@Override
public final void run() {
public void run() {
try {
decrementPendingOutboundBytes();
write(ctx, msg, promise);
if (size >= 0) {
ctx.invokeWrite(msg, promise);
} else {
ctx.invokeWriteAndFlush(msg, promise);
}
} finally {
recycle();
}
@ -1089,7 +1109,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
private void decrementPendingOutboundBytes() {
if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
ctx.pipeline.decrementPendingOutboundBytes(size);
ctx.pipeline.decrementPendingOutboundBytes(size >= 0 ? size : (size & Integer.MAX_VALUE));
}
}
@ -1100,59 +1120,6 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
promise = null;
handle.recycle(this);
}
protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.invokeWrite(msg, promise);
}
}
static final class WriteTask extends AbstractWriteTask implements AbstractEventExecutor.LazyRunnable {
private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
@Override
public WriteTask newObject(Handle<WriteTask> handle) {
return new WriteTask(handle);
}
});
static WriteTask newInstance(
AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
WriteTask task = RECYCLER.get();
init(task, ctx, msg, promise);
return task;
}
private WriteTask(Handle<WriteTask> handle) {
super(handle);
}
}
static final class WriteAndFlushTask extends AbstractWriteTask {
private static final ObjectPool<WriteAndFlushTask> RECYCLER = ObjectPool.newPool(
new ObjectCreator<WriteAndFlushTask>() {
@Override
public WriteAndFlushTask newObject(Handle<WriteAndFlushTask> handle) {
return new WriteAndFlushTask(handle);
}
});
static WriteAndFlushTask newInstance(
AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
WriteAndFlushTask task = RECYCLER.get();
init(task, ctx, msg, promise);
return task;
}
private WriteAndFlushTask(Handle<WriteAndFlushTask> handle) {
super(handle);
}
@Override
public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
super.write(ctx, msg, promise);
ctx.invokeFlush();
}
}
private static final class Tasks {