Customizable estimation for messages written outside the EventLoop

Motivation:

Estimation algorithm currently used for WriteTasks is complicated and
wrong. Additionally, some code relies on outbound buffer size
incremented only on actual writes to the outbound buffer.

Modifications:

- Throw away the old estimator and replace with a simple algorithm that
  uses the client-provided estimator along with a statically configured
  WriteTask overhead (io.netty.transport.writeTaskSizeOverhead system
  property with the default value of 48 bytes)
- Add a io.netty.transport.estimateSizeOnSubmit boolean system property
  allowing the clients to disable the message estimation outside the
  event loop

Result:

Task estimation is user controllable and produces better results by
default
This commit is contained in:
Alexey Ermakov 2015-12-23 16:14:15 +03:00 committed by Norman Maurer
parent 80cff236e4
commit cfd6793bb7

View File

@ -16,23 +16,14 @@
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.RecyclableMpscLinkedQueueNode;
import io.netty.util.internal.SystemPropertyUtil;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import static io.netty.channel.ChannelHandlerInvokerUtil.*;
import static io.netty.channel.DefaultChannelPipeline.*;
@ -403,91 +394,13 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
static final class WriteTask extends RecyclableMpscLinkedQueueNode<SingleThreadEventLoop.NonWakeupRunnable>
implements SingleThreadEventLoop.NonWakeupRunnable {
private static final FastThreadLocal<Map<Class<?>, Integer>> CLASS_SIZES =
new FastThreadLocal<Map<Class<?>, Integer>>() {
@Override
protected Map<Class<?>, Integer> initialValue() throws Exception {
Map<Class<?>, Integer> map = new WeakHashMap<Class<?>, Integer>();
map.put(void.class, 0);
map.put(byte.class, 1);
map.put(char.class, 2);
map.put(short.class, 2);
map.put(boolean.class, 4); // Probably an integer.
map.put(int.class, 4);
map.put(float.class, 4);
map.put(long.class, 8);
map.put(double.class, 8);
return map;
}
};
private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
private static int estimateSize(Object o, Map<Class<?>, Integer> classSizes) {
int answer = 8 + estimateSize(o.getClass(), classSizes, null);
// Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
private static final int WRITE_TASK_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);
if (o instanceof ByteBuf) {
answer += ((ByteBuf) o).readableBytes();
} else if (o instanceof ByteBufHolder) {
answer += ((ByteBufHolder) o).content().readableBytes();
} else if (o instanceof FileRegion) {
// nothing to add.
} else if (o instanceof byte[]) {
answer += ((byte[]) o).length;
} else if (o instanceof ByteBuffer) {
answer += ((ByteBuffer) o).remaining();
} else if (o instanceof CharSequence) {
answer += ((CharSequence) o).length() << 1;
} else if (o instanceof Iterable<?>) {
for (Object m : (Iterable<?>) o) {
answer += estimateSize(m, classSizes);
}
}
return align(answer);
}
private static int estimateSize(Class<?> clazz, Map<Class<?>, Integer> classSizes,
Set<Class<?>> visitedClasses) {
Integer objectSize = classSizes.get(clazz);
if (objectSize != null) {
return objectSize;
}
if (visitedClasses != null) {
if (visitedClasses.contains(clazz)) {
return 0;
}
} else {
visitedClasses = new HashSet<Class<?>>();
}
visitedClasses.add(clazz);
int answer = 8; // Basic overhead.
for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
Field[] fields = c.getDeclaredFields();
for (Field f : fields) {
if ((f.getModifiers() & Modifier.STATIC) != 0) {
// Ignore static fields.
continue;
}
answer += estimateSize(f.getType(), classSizes, visitedClasses);
}
}
visitedClasses.remove(clazz);
// Some alignment.
answer = align(answer);
// Put the final answer.
classSizes.put(clazz, answer);
return answer;
}
private static int align(int size) {
return size + 8 - (size & 7);
}
private ChannelHandlerContext ctx;
private Object msg;
private ChannelPromise promise;
@ -506,13 +419,21 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
task.ctx = ctx;
task.msg = msg;
task.promise = promise;
task.size = ((AbstractChannel) ctx.channel()).estimatorHandle().size(msg)
+ estimateSize(task, CLASS_SIZES.get());
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(task.size);
if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
task.size = ((AbstractChannel) ctx.channel()).estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
buffer.incrementPendingOutboundBytes(task.size);
} else {
task.size = 0;
}
} else {
task.size = 0;
}
return task;
}
@ -525,7 +446,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
try {
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
invokeWriteNow(ctx, msg, promise);