Guard against re-entrancy issues while draining AbstractCoalescingBufferQueue (#10294)

Motivation:

AbstractCoalescingBufferQueue had a bug which could lead to an empty queue while still report bytes left. This was due the fact that we decremented the pending bytes before draining the queue one-by-one. The problem here is that while the queue is drained we may notify the promise which may add again buffers to the queue for which we never decrement the bytes while we drain these

Modifications:

- Decrement the pending bytes every time we drain a buffer from the queue
- Add unit tests

Result:

Fixes https://github.com/netty/netty/issues/10286
This commit is contained in:
Norman Maurer 2020-05-15 09:51:33 +02:00
parent 6ff2089ee4
commit 67622a8141
2 changed files with 96 additions and 3 deletions

View File

@ -140,6 +140,7 @@ public abstract class AbstractCoalescingBufferQueue {
// Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer. // Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer.
if (bufAndListenerPairs.isEmpty()) { if (bufAndListenerPairs.isEmpty()) {
assert readableBytes == 0;
return removeEmptyValue(); return removeEmptyValue();
} }
bytes = Math.min(bytes, readableBytes); bytes = Math.min(bytes, readableBytes);
@ -221,7 +222,6 @@ public abstract class AbstractCoalescingBufferQueue {
* @param ctx The context to write all elements to. * @param ctx The context to write all elements to.
*/ */
public final void writeAndRemoveAll(ChannelHandlerContext ctx) { public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
decrementReadableBytes(readableBytes);
Throwable pending = null; Throwable pending = null;
ByteBuf previousBuf = null; ByteBuf previousBuf = null;
for (;;) { for (;;) {
@ -229,6 +229,7 @@ public abstract class AbstractCoalescingBufferQueue {
try { try {
if (entry == null) { if (entry == null) {
if (previousBuf != null) { if (previousBuf != null) {
decrementReadableBytes(previousBuf.readableBytes());
ctx.write(previousBuf, ctx.voidPromise()); ctx.write(previousBuf, ctx.voidPromise());
} }
break; break;
@ -236,13 +237,16 @@ public abstract class AbstractCoalescingBufferQueue {
if (entry instanceof ByteBuf) { if (entry instanceof ByteBuf) {
if (previousBuf != null) { if (previousBuf != null) {
decrementReadableBytes(previousBuf.readableBytes());
ctx.write(previousBuf, ctx.voidPromise()); ctx.write(previousBuf, ctx.voidPromise());
} }
previousBuf = (ByteBuf) entry; previousBuf = (ByteBuf) entry;
} else if (entry instanceof ChannelPromise) { } else if (entry instanceof ChannelPromise) {
decrementReadableBytes(previousBuf.readableBytes());
ctx.write(previousBuf, (ChannelPromise) entry); ctx.write(previousBuf, (ChannelPromise) entry);
previousBuf = null; previousBuf = null;
} else { } else {
decrementReadableBytes(previousBuf.readableBytes());
ctx.write(previousBuf).addListener((ChannelFutureListener) entry); ctx.write(previousBuf).addListener((ChannelFutureListener) entry);
previousBuf = null; previousBuf = null;
} }
@ -326,7 +330,6 @@ public abstract class AbstractCoalescingBufferQueue {
} }
private void releaseAndCompleteAll(ChannelFuture future) { private void releaseAndCompleteAll(ChannelFuture future) {
decrementReadableBytes(readableBytes);
Throwable pending = null; Throwable pending = null;
for (;;) { for (;;) {
Object entry = bufAndListenerPairs.poll(); Object entry = bufAndListenerPairs.poll();
@ -335,7 +338,9 @@ public abstract class AbstractCoalescingBufferQueue {
} }
try { try {
if (entry instanceof ByteBuf) { if (entry instanceof ByteBuf) {
safeRelease(entry); ByteBuf buffer = (ByteBuf) entry;
decrementReadableBytes(buffer.readableBytes());
safeRelease(buffer);
} else { } else {
((ChannelFutureListener) entry).operationComplete(future); ((ChannelFutureListener) entry).operationComplete(future);
} }

View File

@ -0,0 +1,88 @@
/*
* Copyright 2020 The Netty Project
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.ReferenceCountUtil;
import org.junit.Test;
import java.nio.channels.ClosedChannelException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class AbstractCoalescingBufferQueueTest {
// See https://github.com/netty/netty/issues/10286
@Test
public void testDecrementAllWhenWriteAndRemoveAll() {
testDecrementAll(true);
}
// See https://github.com/netty/netty/issues/10286
@Test
public void testDecrementAllWhenReleaseAndFailAll() {
testDecrementAll(false);
}
private static void testDecrementAll(boolean write) {
EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg);
promise.setSuccess();
}
}, new ChannelHandlerAdapter() { });
final AbstractCoalescingBufferQueue queue = new AbstractCoalescingBufferQueue(channel, 128) {
@Override
protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
return composeIntoComposite(alloc, cumulation, next);
}
@Override
protected ByteBuf removeEmptyValue() {
return Unpooled.EMPTY_BUFFER;
}
};
final byte[] bytes = new byte[128];
queue.add(Unpooled.wrappedBuffer(bytes), future -> {
queue.add(Unpooled.wrappedBuffer(bytes));
assertEquals(bytes.length, queue.readableBytes());
});
assertEquals(bytes.length, queue.readableBytes());
ChannelHandlerContext ctx = channel.pipeline().lastContext();
if (write) {
queue.writeAndRemoveAll(ctx);
} else {
queue.releaseAndFailAll(ctx, new ClosedChannelException());
}
ByteBuf buffer = queue.remove(channel.alloc(), 128, channel.newPromise());
assertFalse(buffer.isReadable());
buffer.release();
assertTrue(queue.isEmpty());
assertEquals(0, queue.readableBytes());
assertFalse(channel.finish());
}
}