From 1c9c797e821636b9bef2f22bd6653840fd7c3c2f Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 17 Feb 2014 16:14:25 +0100 Subject: [PATCH] Move marking ChannelPromise for writes uncancellable to addFlush for keep things simple --- .../netty/channel/ChannelOutboundBuffer.java | 91 ++++++++++--------- 1 file changed, 46 insertions(+), 45 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index dc36b71c71..8228987e48 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -174,6 +174,18 @@ public final class ChannelOutboundBuffer { void addFlush() { unflushed = tail; + + final int mask = buffer.length - 1; + int i = flushed; + while (i != unflushed && buffer[i].msg != null) { + Entry entry = buffer[i]; + if (!entry.promise.setUncancellable()) { + // Was cancelled so make sure we free up memory and notify about the freed bytes + int pending = entry.cancel(); + decrementPendingOutboundBytes(pending); + } + i = i + 1 & mask; + } } /** @@ -255,12 +267,6 @@ public final class ChannelOutboundBuffer { } else { // TODO: Think of a smart way to handle ByteBufHolder messages Entry entry = buffer[flushed]; - if (!entry.cancelled && !entry.promise.setUncancellable()) { - // Was cancelled so make sure we free up memory and notify about the freed bytes - int pending = entry.cancel(); - decrementPendingOutboundBytes(pending); - } - Object msg = entry.msg; if (threadLocalDirectBufferSize <= 0 || !preferDirect) { return msg; @@ -400,52 +406,47 @@ public final class ChannelOutboundBuffer { Entry entry = buffer[i]; if (!entry.cancelled) { - if (!entry.promise.setUncancellable()) { - // Was cancelled so make sure we free up memory and notify about the freed bytes - int pending = entry.cancel(); - decrementPendingOutboundBytes(pending); - } else { - ByteBuf buf = (ByteBuf) m; - final int readerIndex = buf.readerIndex(); - final int readableBytes = buf.writerIndex() - readerIndex; + ByteBuf buf = (ByteBuf) m; + final int readerIndex = buf.readerIndex(); + final int readableBytes = buf.writerIndex() - readerIndex; - if (readableBytes > 0) { - nioBufferSize += readableBytes; - int count = entry.count; - if (count == -1) { - //noinspection ConstantValueVariableUse - entry.count = count = buf.nioBufferCount(); - } - int neededSpace = nioBufferCount + count; - if (neededSpace > nioBuffers.length) { - this.nioBuffers = nioBuffers = - expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); - } - if (buf.isDirect() || threadLocalDirectBufferSize <= 0) { - if (count == 1) { - ByteBuffer nioBuf = entry.buf; - if (nioBuf == null) { - // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a - // derived buffer - entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); - } - nioBuffers[nioBufferCount ++] = nioBuf; - } else { - ByteBuffer[] nioBufs = entry.buffers; - if (nioBufs == null) { - // cached ByteBuffers as they may be expensive to create in terms - // of Object allocation - entry.buffers = nioBufs = buf.nioBuffers(); - } - nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); + if (readableBytes > 0) { + nioBufferSize += readableBytes; + int count = entry.count; + if (count == -1) { + //noinspection ConstantValueVariableUse + entry.count = count = buf.nioBufferCount(); + } + int neededSpace = nioBufferCount + count; + if (neededSpace > nioBuffers.length) { + this.nioBuffers = nioBuffers = + expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); + } + if (buf.isDirect() || threadLocalDirectBufferSize <= 0) { + if (count == 1) { + ByteBuffer nioBuf = entry.buf; + if (nioBuf == null) { + // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a + // derived buffer + entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } + nioBuffers[nioBufferCount ++] = nioBuf; } else { - nioBufferCount = fillBufferArrayNonDirect(entry, buf, readerIndex, - readableBytes, alloc, nioBuffers, nioBufferCount); + ByteBuffer[] nioBufs = entry.buffers; + if (nioBufs == null) { + // cached ByteBuffers as they may be expensive to create in terms + // of Object allocation + entry.buffers = nioBufs = buf.nioBuffers(); + } + nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); } + } else { + nioBufferCount = fillBufferArrayNonDirect(entry, buf, readerIndex, + readableBytes, alloc, nioBuffers, nioBufferCount); } } } + i = i + 1 & mask; } this.nioBufferCount = nioBufferCount;