[#4363] Improve size calculation of messages when written from outside the EventLoop
Motiviation: If a user writes from outside the EventLoop we increase the pending bytes of the outbound buffer before submitting the write request. This is done so the user can stop writing asap once the channel turns unwritable. Unfortunally this doesn't take the overhead of adding the task into the account and so it is very easy for an user to full up the task queue. Beside this we use a value of 0 for an unown message by default which is not ideal. Modifications: - port the message calculation we used in netty 3.x into AbstractChannelHandlerContext and so better calculate the overhead of a message that is submitted from outside the EventLoop - change the default estimated size for an unknown message to 8. Result: Better behaviour when submiting writes from outside the EventLoop.
This commit is contained in:
parent
3b2648c3d1
commit
c42d710c6e
@ -15,17 +15,27 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.channel;
|
package io.netty.channel;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
|
import io.netty.buffer.ByteBufHolder;
|
||||||
import io.netty.util.DefaultAttributeMap;
|
import io.netty.util.DefaultAttributeMap;
|
||||||
import io.netty.util.Recycler;
|
import io.netty.util.Recycler;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.concurrent.EventExecutor;
|
import io.netty.util.concurrent.EventExecutor;
|
||||||
import io.netty.util.concurrent.EventExecutorGroup;
|
import io.netty.util.concurrent.EventExecutorGroup;
|
||||||
|
import io.netty.util.concurrent.FastThreadLocal;
|
||||||
import io.netty.util.internal.OneTimeTask;
|
import io.netty.util.internal.OneTimeTask;
|
||||||
import io.netty.util.internal.RecyclableMpscLinkedQueueNode;
|
import io.netty.util.internal.RecyclableMpscLinkedQueueNode;
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
|
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.lang.reflect.Modifier;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.WeakHashMap;
|
||||||
|
|
||||||
import static io.netty.channel.DefaultChannelPipeline.*;
|
import static io.netty.channel.DefaultChannelPipeline.*;
|
||||||
|
|
||||||
@ -684,7 +694,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void write(Object msg, boolean flush, ChannelPromise promise) {
|
private void write(Object msg, boolean flush, ChannelPromise promise) {
|
||||||
|
|
||||||
AbstractChannelHandlerContext next = findContextOutbound();
|
AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
EventExecutor executor = next.executor();
|
EventExecutor executor = next.executor();
|
||||||
if (executor.inEventLoop()) {
|
if (executor.inEventLoop()) {
|
||||||
@ -693,19 +702,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
next.invokeFlush();
|
next.invokeFlush();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
int size = channel.estimatorHandle().size(msg);
|
AbstractWriteTask task;
|
||||||
if (size > 0) {
|
|
||||||
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
|
|
||||||
// Check for null as it may be set to null if the channel is closed already
|
|
||||||
if (buffer != null) {
|
|
||||||
buffer.incrementPendingOutboundBytes(size);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Runnable task;
|
|
||||||
if (flush) {
|
if (flush) {
|
||||||
task = WriteAndFlushTask.newInstance(next, msg, size, promise);
|
task = WriteAndFlushTask.newInstance(next, msg, promise);
|
||||||
} else {
|
} else {
|
||||||
task = WriteTask.newInstance(next, msg, size, promise);
|
task = WriteTask.newInstance(next, msg, promise);
|
||||||
}
|
}
|
||||||
safeExecute(executor, task, promise, msg);
|
safeExecute(executor, task, promise, msg);
|
||||||
}
|
}
|
||||||
@ -863,6 +864,25 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
}
|
}
|
||||||
|
|
||||||
abstract static class AbstractWriteTask extends RecyclableMpscLinkedQueueNode<Runnable> implements Runnable {
|
abstract static class AbstractWriteTask extends RecyclableMpscLinkedQueueNode<Runnable> implements Runnable {
|
||||||
|
|
||||||
|
private static final FastThreadLocal<Map<Class<?>, Integer>> CLASS_SIZES =
|
||||||
|
new FastThreadLocal<Map<Class<?>, Integer>>() {
|
||||||
|
@Override
|
||||||
|
protected Map<Class<?>, Integer> initialValue() throws Exception {
|
||||||
|
Map<Class<?>, Integer> map = new WeakHashMap<Class<?>, Integer>();
|
||||||
|
map.put(void.class, 0);
|
||||||
|
map.put(byte.class, 1);
|
||||||
|
map.put(char.class, 2);
|
||||||
|
map.put(short.class, 2);
|
||||||
|
map.put(boolean.class, 4); // Probably an integer.
|
||||||
|
map.put(int.class, 4);
|
||||||
|
map.put(float.class, 4);
|
||||||
|
map.put(long.class, 8);
|
||||||
|
map.put(double.class, 8);
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
private AbstractChannelHandlerContext ctx;
|
private AbstractChannelHandlerContext ctx;
|
||||||
private Object msg;
|
private Object msg;
|
||||||
private ChannelPromise promise;
|
private ChannelPromise promise;
|
||||||
@ -872,12 +892,86 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
super(handle);
|
super(handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static int estimateSize(Object o, Map<Class<?>, Integer> classSizes) {
|
||||||
|
int answer = 8 + estimateSize(o.getClass(), classSizes, null);
|
||||||
|
|
||||||
|
if (o instanceof ByteBuf) {
|
||||||
|
answer += ((ByteBuf) o).readableBytes();
|
||||||
|
} else if (o instanceof ByteBufHolder) {
|
||||||
|
answer += ((ByteBufHolder) o).content().readableBytes();
|
||||||
|
} else if (o instanceof FileRegion) {
|
||||||
|
// nothing to add.
|
||||||
|
} else if (o instanceof byte[]) {
|
||||||
|
answer += ((byte[]) o).length;
|
||||||
|
} else if (o instanceof ByteBuffer) {
|
||||||
|
answer += ((ByteBuffer) o).remaining();
|
||||||
|
} else if (o instanceof CharSequence) {
|
||||||
|
answer += ((CharSequence) o).length() << 1;
|
||||||
|
} else if (o instanceof Iterable<?>) {
|
||||||
|
for (Object m : (Iterable<?>) o) {
|
||||||
|
answer += estimateSize(m, classSizes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return align(answer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int estimateSize(Class<?> clazz, Map<Class<?>, Integer> classSizes,
|
||||||
|
Set<Class<?>> visitedClasses) {
|
||||||
|
Integer objectSize = classSizes.get(clazz);
|
||||||
|
if (objectSize != null) {
|
||||||
|
return objectSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (visitedClasses != null) {
|
||||||
|
if (visitedClasses.contains(clazz)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
visitedClasses = new HashSet<Class<?>>();
|
||||||
|
}
|
||||||
|
|
||||||
|
visitedClasses.add(clazz);
|
||||||
|
|
||||||
|
int answer = 8; // Basic overhead.
|
||||||
|
for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
|
||||||
|
Field[] fields = c.getDeclaredFields();
|
||||||
|
for (Field f : fields) {
|
||||||
|
if ((f.getModifiers() & Modifier.STATIC) != 0) {
|
||||||
|
// Ignore static fields.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
answer += estimateSize(f.getType(), classSizes, visitedClasses);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
visitedClasses.remove(clazz);
|
||||||
|
|
||||||
|
// Some alignment.
|
||||||
|
answer = align(answer);
|
||||||
|
|
||||||
|
// Put the final answer.
|
||||||
|
classSizes.put(clazz, answer);
|
||||||
|
return answer;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int align(int size) {
|
||||||
|
return size + 8 - (size & 7);
|
||||||
|
}
|
||||||
|
|
||||||
protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
|
protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
|
||||||
Object msg, int size, ChannelPromise promise) {
|
Object msg, ChannelPromise promise) {
|
||||||
task.ctx = ctx;
|
task.ctx = ctx;
|
||||||
task.msg = msg;
|
task.msg = msg;
|
||||||
task.promise = promise;
|
task.promise = promise;
|
||||||
task.size = size;
|
task.size = ctx.channel.estimatorHandle().size(msg) + estimateSize(task, CLASS_SIZES.get());
|
||||||
|
|
||||||
|
ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer();
|
||||||
|
// Check for null as it may be set to null if the channel is closed already
|
||||||
|
if (buffer != null) {
|
||||||
|
buffer.incrementPendingOutboundBytes(task.size);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -919,9 +1013,9 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
};
|
};
|
||||||
|
|
||||||
private static WriteTask newInstance(
|
private static WriteTask newInstance(
|
||||||
AbstractChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
|
AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
||||||
WriteTask task = RECYCLER.get();
|
WriteTask task = RECYCLER.get();
|
||||||
init(task, ctx, msg, size, promise);
|
init(task, ctx, msg, promise);
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -945,9 +1039,9 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
};
|
};
|
||||||
|
|
||||||
private static WriteAndFlushTask newInstance(
|
private static WriteAndFlushTask newInstance(
|
||||||
AbstractChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
|
AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
||||||
WriteAndFlushTask task = RECYCLER.get();
|
WriteAndFlushTask task = RECYCLER.get();
|
||||||
init(task, ctx, msg, size, promise);
|
init(task, ctx, msg, promise);
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,9 +47,9 @@ public final class DefaultMessageSizeEstimator implements MessageSizeEstimator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the default implementation which returns {@code -1} for unknown messages.
|
* Return the default implementation which returns {@code 8} for unknown messages.
|
||||||
*/
|
*/
|
||||||
public static final MessageSizeEstimator DEFAULT = new DefaultMessageSizeEstimator(0);
|
public static final MessageSizeEstimator DEFAULT = new DefaultMessageSizeEstimator(8);
|
||||||
|
|
||||||
private final Handle handle;
|
private final Handle handle;
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user