From b413464091608c3bad00f4d072aaad29bac60554 Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Sat, 23 Nov 2019 11:45:35 -0800 Subject: [PATCH] Optimize WriteTask recycling (#9800) Motivation AbstractChannelHandlerContext uses recyclable tasks when performing writes from outside of the event loop. There's currently two distinct classes WriteTask and WriteAndFlushTask used for executing writes versus writeAndFlushes, and these are recycled in separate pools. However it is straightforward to just have a single class / recycler pool, with a flush flag. Modifications - Unify WriteTasks into a single class using the sign bit of the existing size field to indicate whether a flush should be performed - Use the new executor lazyExecute() method to lazily execute the non-flush write tasks explicitly - Change AbstractChannelHandlerContext#invokeWrite and AbstractChannelHandlerContext#invokeWriteAndFlush from private to package-private to avoid synthetic methods - Correct the default object size estimate for WriteTask Results - Possibly improved reuse of recycled write tasks - Fewer virtual method calls and shorter path lengths - Less code --- .../AbstractChannelHandlerContext.java | 133 +++++++----------- 1 file changed, 50 insertions(+), 83 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index b38bacfbd2..5a7350f9ca 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -495,7 +495,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R public void run() { next.invokeBind(localAddress, promise); } - }, promise, null); + }, promise, null, false); } return promise; } @@ -539,7 +539,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R public void run() { next.invokeConnect(remoteAddress, localAddress, promise); } - }, promise, null); + }, promise, null, false); } return promise; } @@ -578,7 +578,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R public void run() { next.invokeDisconnect(promise); } - }, promise, null); + }, promise, null, false); } return promise; } @@ -612,7 +612,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R public void run() { next.invokeClose(promise); } - }, promise, null); + }, promise, null, false); } return promise; @@ -647,7 +647,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R public void run() { next.invokeDeregister(promise); } - }, promise, null); + }, promise, null, false); } return promise; @@ -706,7 +706,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return promise; } - private void invokeWrite(Object msg, ChannelPromise promise) { + void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { @@ -733,7 +733,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R if (tasks == null) { next.invokeTasks = tasks = new Tasks(next); } - safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null); + safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false); } return this; @@ -761,7 +761,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return promise; } - private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { + void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); @@ -794,14 +794,9 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R next.invokeWrite(m, promise); } } else { - final AbstractWriteTask task; - if (flush) { - task = WriteAndFlushTask.newInstance(next, m, promise); - } else { - task = WriteTask.newInstance(next, m, promise); - } - if (!safeExecute(executor, task, promise, m)) { - // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes + final WriteTask task = WriteTask.newInstance(next, m, promise, flush); + if (!safeExecute(executor, task, promise, m, !flush)) { + // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. @@ -1009,9 +1004,14 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return channel().hasAttr(key); } - private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { + private static boolean safeExecute(EventExecutor executor, Runnable runnable, + ChannelPromise promise, Object msg, boolean lazy) { try { - executor.execute(runnable); + if (lazy && executor instanceof AbstractEventExecutor) { + ((AbstractEventExecutor) executor).lazyExecute(runnable); + } else { + executor.execute(runnable); + } return true; } catch (Throwable cause) { try { @@ -1035,28 +1035,41 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')'; } - abstract static class AbstractWriteTask implements Runnable { + static final class WriteTask implements Runnable { + private static final ObjectPool RECYCLER = ObjectPool.newPool(new ObjectCreator() { + @Override + public WriteTask newObject(Handle handle) { + return new WriteTask(handle); + } + }); + + static WriteTask newInstance(AbstractChannelHandlerContext ctx, + Object msg, ChannelPromise promise, boolean flush) { + WriteTask task = RECYCLER.get(); + init(task, ctx, msg, promise, flush); + return task; + } private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT = SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true); - // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment + // Assuming compressed oops, 12 bytes obj header, 4 ref fields and one int field private static final int WRITE_TASK_OVERHEAD = - SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48); + SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32); - private final Handle handle; + private final Handle handle; private AbstractChannelHandlerContext ctx; private Object msg; private ChannelPromise promise; - private int size; + private int size; // sign bit controls flush @SuppressWarnings("unchecked") - private AbstractWriteTask(Handle handle) { - this.handle = (Handle) handle; + private WriteTask(Handle handle) { + this.handle = (Handle) handle; } - protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx, - Object msg, ChannelPromise promise) { + protected static void init(WriteTask task, AbstractChannelHandlerContext ctx, + Object msg, ChannelPromise promise, boolean flush) { task.ctx = ctx; task.msg = msg; task.promise = promise; @@ -1067,13 +1080,20 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R } else { task.size = 0; } + if (flush) { + task.size |= Integer.MIN_VALUE; + } } @Override - public final void run() { + public void run() { try { decrementPendingOutboundBytes(); - write(ctx, msg, promise); + if (size >= 0) { + ctx.invokeWrite(msg, promise); + } else { + ctx.invokeWriteAndFlush(msg, promise); + } } finally { recycle(); } @@ -1089,7 +1109,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R private void decrementPendingOutboundBytes() { if (ESTIMATE_TASK_SIZE_ON_SUBMIT) { - ctx.pipeline.decrementPendingOutboundBytes(size); + ctx.pipeline.decrementPendingOutboundBytes(size >= 0 ? size : (size & Integer.MAX_VALUE)); } } @@ -1100,59 +1120,6 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R promise = null; handle.recycle(this); } - - protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - ctx.invokeWrite(msg, promise); - } - } - - static final class WriteTask extends AbstractWriteTask implements AbstractEventExecutor.LazyRunnable { - - private static final ObjectPool RECYCLER = ObjectPool.newPool(new ObjectCreator() { - @Override - public WriteTask newObject(Handle handle) { - return new WriteTask(handle); - } - }); - - static WriteTask newInstance( - AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - WriteTask task = RECYCLER.get(); - init(task, ctx, msg, promise); - return task; - } - - private WriteTask(Handle handle) { - super(handle); - } - } - - static final class WriteAndFlushTask extends AbstractWriteTask { - - private static final ObjectPool RECYCLER = ObjectPool.newPool( - new ObjectCreator() { - @Override - public WriteAndFlushTask newObject(Handle handle) { - return new WriteAndFlushTask(handle); - } - }); - - static WriteAndFlushTask newInstance( - AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - WriteAndFlushTask task = RECYCLER.get(); - init(task, ctx, msg, promise); - return task; - } - - private WriteAndFlushTask(Handle handle) { - super(handle); - } - - @Override - public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - super.write(ctx, msg, promise); - ctx.invokeFlush(); - } } private static final class Tasks {