[#4363] Improve size calculation of messages when written from outside the EventLoop
Motiviation: If a user writes from outside the EventLoop we increase the pending bytes of the outbound buffer before submitting the write request. This is done so the user can stop writing asap once the channel turns unwritable. Unfortunally this doesn't take the overhead of adding the task into the account and so it is very easy for an user to full up the task queue. Beside this we use a value of 0 for an unown message by default which is not ideal. Modifications: - port the message calculation we used in netty 3.x into AbstractChannelHandlerContext and so better calculate the overhead of a message that is submitted from outside the EventLoop - change the default estimated size for an unknown message to 8. Result: Better behaviour when submiting writes from outside the EventLoop.
This commit is contained in:
parent
e121c68e0f
commit
2100a83e30
@ -16,13 +16,23 @@
|
||||
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufHolder;
|
||||
import io.netty.util.Recycler;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.FastThreadLocal;
|
||||
import io.netty.util.internal.OneTimeTask;
|
||||
import io.netty.util.internal.RecyclableMpscLinkedQueueNode;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.WeakHashMap;
|
||||
|
||||
import static io.netty.channel.ChannelHandlerInvokerUtil.*;
|
||||
import static io.netty.channel.DefaultChannelPipeline.*;
|
||||
@ -336,16 +346,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
if (executor.inEventLoop()) {
|
||||
invokeWriteNow(ctx, msg, promise);
|
||||
} else {
|
||||
AbstractChannel channel = (AbstractChannel) ctx.channel();
|
||||
int size = channel.estimatorHandle().size(msg);
|
||||
if (size > 0) {
|
||||
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
|
||||
// Check for null as it may be set to null if the channel is closed already
|
||||
if (buffer != null) {
|
||||
buffer.incrementPendingOutboundBytes(size);
|
||||
}
|
||||
}
|
||||
safeExecuteOutbound(WriteTask.newInstance(ctx, msg, size, promise), promise, msg);
|
||||
safeExecuteOutbound(WriteTask.newInstance(ctx, msg, promise), promise, msg);
|
||||
}
|
||||
}
|
||||
|
||||
@ -401,6 +402,92 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
|
||||
static final class WriteTask extends RecyclableMpscLinkedQueueNode<SingleThreadEventLoop.NonWakeupRunnable>
|
||||
implements SingleThreadEventLoop.NonWakeupRunnable {
|
||||
|
||||
private static final FastThreadLocal<Map<Class<?>, Integer>> CLASS_SIZES =
|
||||
new FastThreadLocal<Map<Class<?>, Integer>>() {
|
||||
@Override
|
||||
protected Map<Class<?>, Integer> initialValue() throws Exception {
|
||||
Map<Class<?>, Integer> map = new WeakHashMap<Class<?>, Integer>();
|
||||
map.put(void.class, 0);
|
||||
map.put(byte.class, 1);
|
||||
map.put(char.class, 2);
|
||||
map.put(short.class, 2);
|
||||
map.put(boolean.class, 4); // Probably an integer.
|
||||
map.put(int.class, 4);
|
||||
map.put(float.class, 4);
|
||||
map.put(long.class, 8);
|
||||
map.put(double.class, 8);
|
||||
return map;
|
||||
}
|
||||
};
|
||||
|
||||
private static int estimateSize(Object o, Map<Class<?>, Integer> classSizes) {
|
||||
int answer = 8 + estimateSize(o.getClass(), classSizes, null);
|
||||
|
||||
if (o instanceof ByteBuf) {
|
||||
answer += ((ByteBuf) o).readableBytes();
|
||||
} else if (o instanceof ByteBufHolder) {
|
||||
answer += ((ByteBufHolder) o).content().readableBytes();
|
||||
} else if (o instanceof FileRegion) {
|
||||
// nothing to add.
|
||||
} else if (o instanceof byte[]) {
|
||||
answer += ((byte[]) o).length;
|
||||
} else if (o instanceof ByteBuffer) {
|
||||
answer += ((ByteBuffer) o).remaining();
|
||||
} else if (o instanceof CharSequence) {
|
||||
answer += ((CharSequence) o).length() << 1;
|
||||
} else if (o instanceof Iterable<?>) {
|
||||
for (Object m : (Iterable<?>) o) {
|
||||
answer += estimateSize(m, classSizes);
|
||||
}
|
||||
}
|
||||
|
||||
return align(answer);
|
||||
}
|
||||
|
||||
private static int estimateSize(Class<?> clazz, Map<Class<?>, Integer> classSizes,
|
||||
Set<Class<?>> visitedClasses) {
|
||||
Integer objectSize = classSizes.get(clazz);
|
||||
if (objectSize != null) {
|
||||
return objectSize;
|
||||
}
|
||||
|
||||
if (visitedClasses != null) {
|
||||
if (visitedClasses.contains(clazz)) {
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
visitedClasses = new HashSet<Class<?>>();
|
||||
}
|
||||
|
||||
visitedClasses.add(clazz);
|
||||
|
||||
int answer = 8; // Basic overhead.
|
||||
for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
|
||||
Field[] fields = c.getDeclaredFields();
|
||||
for (Field f : fields) {
|
||||
if ((f.getModifiers() & Modifier.STATIC) != 0) {
|
||||
// Ignore static fields.
|
||||
continue;
|
||||
}
|
||||
|
||||
answer += estimateSize(f.getType(), classSizes, visitedClasses);
|
||||
}
|
||||
}
|
||||
|
||||
visitedClasses.remove(clazz);
|
||||
|
||||
// Some alignment.
|
||||
answer = align(answer);
|
||||
|
||||
// Put the final answer.
|
||||
classSizes.put(clazz, answer);
|
||||
return answer;
|
||||
}
|
||||
|
||||
private static int align(int size) {
|
||||
return size + 8 - (size & 7);
|
||||
}
|
||||
private ChannelHandlerContext ctx;
|
||||
private Object msg;
|
||||
private ChannelPromise promise;
|
||||
@ -414,12 +501,18 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
};
|
||||
|
||||
private static WriteTask newInstance(
|
||||
ChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
|
||||
ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
||||
WriteTask task = RECYCLER.get();
|
||||
task.ctx = ctx;
|
||||
task.msg = msg;
|
||||
task.promise = promise;
|
||||
task.size = size;
|
||||
task.size = ((AbstractChannel) ctx.channel()).estimatorHandle().size(msg)
|
||||
+ estimateSize(task, CLASS_SIZES.get());
|
||||
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
|
||||
// Check for null as it may be set to null if the channel is closed already
|
||||
if (buffer != null) {
|
||||
buffer.incrementPendingOutboundBytes(task.size);
|
||||
}
|
||||
return task;
|
||||
}
|
||||
|
||||
@ -430,12 +523,10 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (size > 0) {
|
||||
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
|
||||
// Check for null as it may be set to null if the channel is closed already
|
||||
if (buffer != null) {
|
||||
buffer.decrementPendingOutboundBytes(size);
|
||||
}
|
||||
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
|
||||
// Check for null as it may be set to null if the channel is closed already
|
||||
if (buffer != null) {
|
||||
buffer.decrementPendingOutboundBytes(size);
|
||||
}
|
||||
invokeWriteNow(ctx, msg, promise);
|
||||
} finally {
|
||||
|
@ -47,9 +47,9 @@ public final class DefaultMessageSizeEstimator implements MessageSizeEstimator {
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the default implementation which returns {@code -1} for unknown messages.
|
||||
* Return the default implementation which returns {@code 8} for unknown messages.
|
||||
*/
|
||||
public static final MessageSizeEstimator DEFAULT = new DefaultMessageSizeEstimator(0);
|
||||
public static final MessageSizeEstimator DEFAULT = new DefaultMessageSizeEstimator(8);
|
||||
|
||||
private final Handle handle;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user