diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index 514f614e92..94c1d300c8 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -15,17 +15,27 @@ */ package io.netty.channel; +import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufHolder; import io.netty.util.DefaultAttributeMap; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.RecyclableMpscLinkedQueueNode; import io.netty.util.internal.StringUtil; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.WeakHashMap; import static io.netty.channel.DefaultChannelPipeline.*; @@ -684,7 +694,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void write(Object msg, boolean flush, ChannelPromise promise) { - AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { @@ -693,19 +702,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme next.invokeFlush(); } } else { - int size = channel.estimatorHandle().size(msg); - if (size > 0) { - ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); - // Check for null as it may be set to null if the channel is closed already - if (buffer != null) { - buffer.incrementPendingOutboundBytes(size); - } - } - Runnable task; + AbstractWriteTask task; if (flush) { - task = WriteAndFlushTask.newInstance(next, msg, size, promise); + task = WriteAndFlushTask.newInstance(next, msg, promise); } else { - task = WriteTask.newInstance(next, msg, size, promise); + task = WriteTask.newInstance(next, msg, promise); } safeExecute(executor, task, promise, msg); } @@ -863,6 +864,25 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } abstract static class AbstractWriteTask extends RecyclableMpscLinkedQueueNode implements Runnable { + + private static final FastThreadLocal, Integer>> CLASS_SIZES = + new FastThreadLocal, Integer>>() { + @Override + protected Map, Integer> initialValue() throws Exception { + Map, Integer> map = new WeakHashMap, Integer>(); + map.put(void.class, 0); + map.put(byte.class, 1); + map.put(char.class, 2); + map.put(short.class, 2); + map.put(boolean.class, 4); // Probably an integer. + map.put(int.class, 4); + map.put(float.class, 4); + map.put(long.class, 8); + map.put(double.class, 8); + return map; + } + }; + private AbstractChannelHandlerContext ctx; private Object msg; private ChannelPromise promise; @@ -872,12 +892,86 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme super(handle); } + private static int estimateSize(Object o, Map, Integer> classSizes) { + int answer = 8 + estimateSize(o.getClass(), classSizes, null); + + if (o instanceof ByteBuf) { + answer += ((ByteBuf) o).readableBytes(); + } else if (o instanceof ByteBufHolder) { + answer += ((ByteBufHolder) o).content().readableBytes(); + } else if (o instanceof FileRegion) { + // nothing to add. + } else if (o instanceof byte[]) { + answer += ((byte[]) o).length; + } else if (o instanceof ByteBuffer) { + answer += ((ByteBuffer) o).remaining(); + } else if (o instanceof CharSequence) { + answer += ((CharSequence) o).length() << 1; + } else if (o instanceof Iterable) { + for (Object m : (Iterable) o) { + answer += estimateSize(m, classSizes); + } + } + + return align(answer); + } + + private static int estimateSize(Class clazz, Map, Integer> classSizes, + Set> visitedClasses) { + Integer objectSize = classSizes.get(clazz); + if (objectSize != null) { + return objectSize; + } + + if (visitedClasses != null) { + if (visitedClasses.contains(clazz)) { + return 0; + } + } else { + visitedClasses = new HashSet>(); + } + + visitedClasses.add(clazz); + + int answer = 8; // Basic overhead. + for (Class c = clazz; c != null; c = c.getSuperclass()) { + Field[] fields = c.getDeclaredFields(); + for (Field f : fields) { + if ((f.getModifiers() & Modifier.STATIC) != 0) { + // Ignore static fields. + continue; + } + + answer += estimateSize(f.getType(), classSizes, visitedClasses); + } + } + + visitedClasses.remove(clazz); + + // Some alignment. + answer = align(answer); + + // Put the final answer. + classSizes.put(clazz, answer); + return answer; + } + + private static int align(int size) { + return size + 8 - (size & 7); + } + protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx, - Object msg, int size, ChannelPromise promise) { + Object msg, ChannelPromise promise) { task.ctx = ctx; task.msg = msg; task.promise = promise; - task.size = size; + task.size = ctx.channel.estimatorHandle().size(msg) + estimateSize(task, CLASS_SIZES.get()); + + ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer(); + // Check for null as it may be set to null if the channel is closed already + if (buffer != null) { + buffer.incrementPendingOutboundBytes(task.size); + } } @Override @@ -919,9 +1013,9 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme }; private static WriteTask newInstance( - AbstractChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) { + AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { WriteTask task = RECYCLER.get(); - init(task, ctx, msg, size, promise); + init(task, ctx, msg, promise); return task; } @@ -945,9 +1039,9 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme }; private static WriteAndFlushTask newInstance( - AbstractChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) { + AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { WriteAndFlushTask task = RECYCLER.get(); - init(task, ctx, msg, size, promise); + init(task, ctx, msg, promise); return task; } diff --git a/transport/src/main/java/io/netty/channel/DefaultMessageSizeEstimator.java b/transport/src/main/java/io/netty/channel/DefaultMessageSizeEstimator.java index 19193e86e7..1459743259 100644 --- a/transport/src/main/java/io/netty/channel/DefaultMessageSizeEstimator.java +++ b/transport/src/main/java/io/netty/channel/DefaultMessageSizeEstimator.java @@ -47,9 +47,9 @@ public final class DefaultMessageSizeEstimator implements MessageSizeEstimator { } /** - * Return the default implementation which returns {@code -1} for unknown messages. + * Return the default implementation which returns {@code 8} for unknown messages. */ - public static final MessageSizeEstimator DEFAULT = new DefaultMessageSizeEstimator(0); + public static final MessageSizeEstimator DEFAULT = new DefaultMessageSizeEstimator(8); private final Handle handle;