Share some code, related to [#1707]]

This commit is contained in:
Norman Maurer 2013-08-07 20:28:33 +02:00
parent 5ef30b6d8b
commit b934b6009c

View File

@ -632,15 +632,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeWrite(msg, promise); next.invokeWrite(msg, promise);
} else { } else {
final int size = channel.estimatorHandle().size(msg); submitWriteTask(next, executor, msg, false, promise);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
executor.execute(WriteTask.newInstance(next, msg, size, false, promise));
} }
return promise; return promise;
@ -697,6 +689,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
next.invokeWrite(msg, promise); next.invokeWrite(msg, promise);
next.invokeFlush(); next.invokeFlush();
} else { } else {
submitWriteTask(next, executor, msg, true, promise);
}
return promise;
}
private void submitWriteTask(DefaultChannelHandlerContext next, EventExecutor executor,
Object msg, boolean flush, ChannelPromise promise) {
final int size = channel.estimatorHandle().size(msg); final int size = channel.estimatorHandle().size(msg);
if (size > 0) { if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
@ -705,10 +705,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
buffer.incrementPendingOutboundBytes(size); buffer.incrementPendingOutboundBytes(size);
} }
} }
executor.execute(WriteTask.newInstance(next, msg, size, true, promise)); executor.execute(WriteTask.newInstance(next, msg, size, flush, promise));
}
return promise;
} }
@Override @Override