Move marking ChannelPromise for writes uncancellable to addFlush for keep things simple
This commit is contained in:
parent
5612472ae6
commit
1c9c797e82
@ -174,6 +174,18 @@ public final class ChannelOutboundBuffer {
|
|||||||
|
|
||||||
void addFlush() {
|
void addFlush() {
|
||||||
unflushed = tail;
|
unflushed = tail;
|
||||||
|
|
||||||
|
final int mask = buffer.length - 1;
|
||||||
|
int i = flushed;
|
||||||
|
while (i != unflushed && buffer[i].msg != null) {
|
||||||
|
Entry entry = buffer[i];
|
||||||
|
if (!entry.promise.setUncancellable()) {
|
||||||
|
// Was cancelled so make sure we free up memory and notify about the freed bytes
|
||||||
|
int pending = entry.cancel();
|
||||||
|
decrementPendingOutboundBytes(pending);
|
||||||
|
}
|
||||||
|
i = i + 1 & mask;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -255,12 +267,6 @@ public final class ChannelOutboundBuffer {
|
|||||||
} else {
|
} else {
|
||||||
// TODO: Think of a smart way to handle ByteBufHolder messages
|
// TODO: Think of a smart way to handle ByteBufHolder messages
|
||||||
Entry entry = buffer[flushed];
|
Entry entry = buffer[flushed];
|
||||||
if (!entry.cancelled && !entry.promise.setUncancellable()) {
|
|
||||||
// Was cancelled so make sure we free up memory and notify about the freed bytes
|
|
||||||
int pending = entry.cancel();
|
|
||||||
decrementPendingOutboundBytes(pending);
|
|
||||||
}
|
|
||||||
|
|
||||||
Object msg = entry.msg;
|
Object msg = entry.msg;
|
||||||
if (threadLocalDirectBufferSize <= 0 || !preferDirect) {
|
if (threadLocalDirectBufferSize <= 0 || !preferDirect) {
|
||||||
return msg;
|
return msg;
|
||||||
@ -400,52 +406,47 @@ public final class ChannelOutboundBuffer {
|
|||||||
Entry entry = buffer[i];
|
Entry entry = buffer[i];
|
||||||
|
|
||||||
if (!entry.cancelled) {
|
if (!entry.cancelled) {
|
||||||
if (!entry.promise.setUncancellable()) {
|
ByteBuf buf = (ByteBuf) m;
|
||||||
// Was cancelled so make sure we free up memory and notify about the freed bytes
|
final int readerIndex = buf.readerIndex();
|
||||||
int pending = entry.cancel();
|
final int readableBytes = buf.writerIndex() - readerIndex;
|
||||||
decrementPendingOutboundBytes(pending);
|
|
||||||
} else {
|
|
||||||
ByteBuf buf = (ByteBuf) m;
|
|
||||||
final int readerIndex = buf.readerIndex();
|
|
||||||
final int readableBytes = buf.writerIndex() - readerIndex;
|
|
||||||
|
|
||||||
if (readableBytes > 0) {
|
if (readableBytes > 0) {
|
||||||
nioBufferSize += readableBytes;
|
nioBufferSize += readableBytes;
|
||||||
int count = entry.count;
|
int count = entry.count;
|
||||||
if (count == -1) {
|
if (count == -1) {
|
||||||
//noinspection ConstantValueVariableUse
|
//noinspection ConstantValueVariableUse
|
||||||
entry.count = count = buf.nioBufferCount();
|
entry.count = count = buf.nioBufferCount();
|
||||||
}
|
}
|
||||||
int neededSpace = nioBufferCount + count;
|
int neededSpace = nioBufferCount + count;
|
||||||
if (neededSpace > nioBuffers.length) {
|
if (neededSpace > nioBuffers.length) {
|
||||||
this.nioBuffers = nioBuffers =
|
this.nioBuffers = nioBuffers =
|
||||||
expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
|
expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
|
||||||
}
|
}
|
||||||
if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
|
if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
|
||||||
if (count == 1) {
|
if (count == 1) {
|
||||||
ByteBuffer nioBuf = entry.buf;
|
ByteBuffer nioBuf = entry.buf;
|
||||||
if (nioBuf == null) {
|
if (nioBuf == null) {
|
||||||
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
|
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
|
||||||
// derived buffer
|
// derived buffer
|
||||||
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
|
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
|
||||||
}
|
|
||||||
nioBuffers[nioBufferCount ++] = nioBuf;
|
|
||||||
} else {
|
|
||||||
ByteBuffer[] nioBufs = entry.buffers;
|
|
||||||
if (nioBufs == null) {
|
|
||||||
// cached ByteBuffers as they may be expensive to create in terms
|
|
||||||
// of Object allocation
|
|
||||||
entry.buffers = nioBufs = buf.nioBuffers();
|
|
||||||
}
|
|
||||||
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
|
|
||||||
}
|
}
|
||||||
|
nioBuffers[nioBufferCount ++] = nioBuf;
|
||||||
} else {
|
} else {
|
||||||
nioBufferCount = fillBufferArrayNonDirect(entry, buf, readerIndex,
|
ByteBuffer[] nioBufs = entry.buffers;
|
||||||
readableBytes, alloc, nioBuffers, nioBufferCount);
|
if (nioBufs == null) {
|
||||||
|
// cached ByteBuffers as they may be expensive to create in terms
|
||||||
|
// of Object allocation
|
||||||
|
entry.buffers = nioBufs = buf.nioBuffers();
|
||||||
|
}
|
||||||
|
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
nioBufferCount = fillBufferArrayNonDirect(entry, buf, readerIndex,
|
||||||
|
readableBytes, alloc, nioBuffers, nioBufferCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
i = i + 1 & mask;
|
i = i + 1 & mask;
|
||||||
}
|
}
|
||||||
this.nioBufferCount = nioBufferCount;
|
this.nioBufferCount = nioBufferCount;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user