diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java index 3d536b1d3e..624b405b4b 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java @@ -16,23 +16,14 @@ package io.netty.channel; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufHolder; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.RecyclableMpscLinkedQueueNode; +import io.netty.util.internal.SystemPropertyUtil; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.WeakHashMap; import static io.netty.channel.ChannelHandlerInvokerUtil.*; import static io.netty.channel.DefaultChannelPipeline.*; @@ -403,91 +394,13 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker { static final class WriteTask extends RecyclableMpscLinkedQueueNode implements SingleThreadEventLoop.NonWakeupRunnable { - private static final FastThreadLocal, Integer>> CLASS_SIZES = - new FastThreadLocal, Integer>>() { - @Override - protected Map, Integer> initialValue() throws Exception { - Map, Integer> map = new WeakHashMap, Integer>(); - map.put(void.class, 0); - map.put(byte.class, 1); - map.put(char.class, 2); - map.put(short.class, 2); - map.put(boolean.class, 4); // Probably an integer. - map.put(int.class, 4); - map.put(float.class, 4); - map.put(long.class, 8); - map.put(double.class, 8); - return map; - } - }; + private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT = + SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true); - private static int estimateSize(Object o, Map, Integer> classSizes) { - int answer = 8 + estimateSize(o.getClass(), classSizes, null); + // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment + private static final int WRITE_TASK_OVERHEAD = + SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48); - if (o instanceof ByteBuf) { - answer += ((ByteBuf) o).readableBytes(); - } else if (o instanceof ByteBufHolder) { - answer += ((ByteBufHolder) o).content().readableBytes(); - } else if (o instanceof FileRegion) { - // nothing to add. - } else if (o instanceof byte[]) { - answer += ((byte[]) o).length; - } else if (o instanceof ByteBuffer) { - answer += ((ByteBuffer) o).remaining(); - } else if (o instanceof CharSequence) { - answer += ((CharSequence) o).length() << 1; - } else if (o instanceof Iterable) { - for (Object m : (Iterable) o) { - answer += estimateSize(m, classSizes); - } - } - - return align(answer); - } - - private static int estimateSize(Class clazz, Map, Integer> classSizes, - Set> visitedClasses) { - Integer objectSize = classSizes.get(clazz); - if (objectSize != null) { - return objectSize; - } - - if (visitedClasses != null) { - if (visitedClasses.contains(clazz)) { - return 0; - } - } else { - visitedClasses = new HashSet>(); - } - - visitedClasses.add(clazz); - - int answer = 8; // Basic overhead. - for (Class c = clazz; c != null; c = c.getSuperclass()) { - Field[] fields = c.getDeclaredFields(); - for (Field f : fields) { - if ((f.getModifiers() & Modifier.STATIC) != 0) { - // Ignore static fields. - continue; - } - - answer += estimateSize(f.getType(), classSizes, visitedClasses); - } - } - - visitedClasses.remove(clazz); - - // Some alignment. - answer = align(answer); - - // Put the final answer. - classSizes.put(clazz, answer); - return answer; - } - - private static int align(int size) { - return size + 8 - (size & 7); - } private ChannelHandlerContext ctx; private Object msg; private ChannelPromise promise; @@ -506,13 +419,21 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker { task.ctx = ctx; task.msg = msg; task.promise = promise; - task.size = ((AbstractChannel) ctx.channel()).estimatorHandle().size(msg) - + estimateSize(task, CLASS_SIZES.get()); - ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer(); - // Check for null as it may be set to null if the channel is closed already - if (buffer != null) { - buffer.incrementPendingOutboundBytes(task.size); + + if (ESTIMATE_TASK_SIZE_ON_SUBMIT) { + ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer(); + + // Check for null as it may be set to null if the channel is closed already + if (buffer != null) { + task.size = ((AbstractChannel) ctx.channel()).estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD; + buffer.incrementPendingOutboundBytes(task.size); + } else { + task.size = 0; + } + } else { + task.size = 0; } + return task; } @@ -525,7 +446,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker { try { ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer(); // Check for null as it may be set to null if the channel is closed already - if (buffer != null) { + if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) { buffer.decrementPendingOutboundBytes(size); } invokeWriteNow(ctx, msg, promise);