diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index b38bacfbd2..5a7350f9ca 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -495,7 +495,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R public void run() { next.invokeBind(localAddress, promise); } - }, promise, null); + }, promise, null, false); } return promise; } @@ -539,7 +539,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R public void run() { next.invokeConnect(remoteAddress, localAddress, promise); } - }, promise, null); + }, promise, null, false); } return promise; } @@ -578,7 +578,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R public void run() { next.invokeDisconnect(promise); } - }, promise, null); + }, promise, null, false); } return promise; } @@ -612,7 +612,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R public void run() { next.invokeClose(promise); } - }, promise, null); + }, promise, null, false); } return promise; @@ -647,7 +647,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R public void run() { next.invokeDeregister(promise); } - }, promise, null); + }, promise, null, false); } return promise; @@ -706,7 +706,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return promise; } - private void invokeWrite(Object msg, ChannelPromise promise) { + void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { @@ -733,7 +733,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R if (tasks == null) { next.invokeTasks = tasks = new Tasks(next); } - safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null); + safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false); } return this; @@ -761,7 +761,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return promise; } - private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { + void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); @@ -794,14 +794,9 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R next.invokeWrite(m, promise); } } else { - final AbstractWriteTask task; - if (flush) { - task = WriteAndFlushTask.newInstance(next, m, promise); - } else { - task = WriteTask.newInstance(next, m, promise); - } - if (!safeExecute(executor, task, promise, m)) { - // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes + final WriteTask task = WriteTask.newInstance(next, m, promise, flush); + if (!safeExecute(executor, task, promise, m, !flush)) { + // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. @@ -1009,9 +1004,14 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return channel().hasAttr(key); } - private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { + private static boolean safeExecute(EventExecutor executor, Runnable runnable, + ChannelPromise promise, Object msg, boolean lazy) { try { - executor.execute(runnable); + if (lazy && executor instanceof AbstractEventExecutor) { + ((AbstractEventExecutor) executor).lazyExecute(runnable); + } else { + executor.execute(runnable); + } return true; } catch (Throwable cause) { try { @@ -1035,28 +1035,41 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')'; } - abstract static class AbstractWriteTask implements Runnable { + static final class WriteTask implements Runnable { + private static final ObjectPool RECYCLER = ObjectPool.newPool(new ObjectCreator() { + @Override + public WriteTask newObject(Handle handle) { + return new WriteTask(handle); + } + }); + + static WriteTask newInstance(AbstractChannelHandlerContext ctx, + Object msg, ChannelPromise promise, boolean flush) { + WriteTask task = RECYCLER.get(); + init(task, ctx, msg, promise, flush); + return task; + } private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT = SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true); - // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment + // Assuming compressed oops, 12 bytes obj header, 4 ref fields and one int field private static final int WRITE_TASK_OVERHEAD = - SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48); + SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32); - private final Handle handle; + private final Handle handle; private AbstractChannelHandlerContext ctx; private Object msg; private ChannelPromise promise; - private int size; + private int size; // sign bit controls flush @SuppressWarnings("unchecked") - private AbstractWriteTask(Handle handle) { - this.handle = (Handle) handle; + private WriteTask(Handle handle) { + this.handle = (Handle) handle; } - protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx, - Object msg, ChannelPromise promise) { + protected static void init(WriteTask task, AbstractChannelHandlerContext ctx, + Object msg, ChannelPromise promise, boolean flush) { task.ctx = ctx; task.msg = msg; task.promise = promise; @@ -1067,13 +1080,20 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R } else { task.size = 0; } + if (flush) { + task.size |= Integer.MIN_VALUE; + } } @Override - public final void run() { + public void run() { try { decrementPendingOutboundBytes(); - write(ctx, msg, promise); + if (size >= 0) { + ctx.invokeWrite(msg, promise); + } else { + ctx.invokeWriteAndFlush(msg, promise); + } } finally { recycle(); } @@ -1089,7 +1109,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R private void decrementPendingOutboundBytes() { if (ESTIMATE_TASK_SIZE_ON_SUBMIT) { - ctx.pipeline.decrementPendingOutboundBytes(size); + ctx.pipeline.decrementPendingOutboundBytes(size >= 0 ? size : (size & Integer.MAX_VALUE)); } } @@ -1100,59 +1120,6 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R promise = null; handle.recycle(this); } - - protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - ctx.invokeWrite(msg, promise); - } - } - - static final class WriteTask extends AbstractWriteTask implements AbstractEventExecutor.LazyRunnable { - - private static final ObjectPool RECYCLER = ObjectPool.newPool(new ObjectCreator() { - @Override - public WriteTask newObject(Handle handle) { - return new WriteTask(handle); - } - }); - - static WriteTask newInstance( - AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - WriteTask task = RECYCLER.get(); - init(task, ctx, msg, promise); - return task; - } - - private WriteTask(Handle handle) { - super(handle); - } - } - - static final class WriteAndFlushTask extends AbstractWriteTask { - - private static final ObjectPool RECYCLER = ObjectPool.newPool( - new ObjectCreator() { - @Override - public WriteAndFlushTask newObject(Handle handle) { - return new WriteAndFlushTask(handle); - } - }); - - static WriteAndFlushTask newInstance( - AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - WriteAndFlushTask task = RECYCLER.get(); - init(task, ctx, msg, promise); - return task; - } - - private WriteAndFlushTask(Handle handle) { - super(handle); - } - - @Override - public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - super.write(ctx, msg, promise); - ctx.invokeFlush(); - } } private static final class Tasks {