Fix NPE caused by re-entrance calls in FlowControlHandler (#9320)
Motivation: 2c99fc0f1290c65685c5036fdc9884921823ad7d introduced a change that eagly recycles the queue. Unfortunally it did not correct protect against re-entrance which can cause a NPE. Modifications: - Correctly protect against re-entrance by adding null checks - Add unit test Result: Fixes https://github.com/netty/netty/issues/9319.
This commit is contained in:
parent
65d8ecc3a0
commit
f7e8603d60
@ -171,37 +171,33 @@ public class FlowControlHandler implements ChannelHandler {
|
|||||||
* @see #channelRead(ChannelHandlerContext, Object)
|
* @see #channelRead(ChannelHandlerContext, Object)
|
||||||
*/
|
*/
|
||||||
private int dequeue(ChannelHandlerContext ctx, int minConsume) {
|
private int dequeue(ChannelHandlerContext ctx, int minConsume) {
|
||||||
if (queue != null) {
|
int consumed = 0;
|
||||||
|
|
||||||
int consumed = 0;
|
// fireChannelRead(...) may call ctx.read() and so this method may reentrance. Because of this we need to
|
||||||
|
// check if queue was set to null in the meantime and if so break the loop.
|
||||||
Object msg;
|
while (queue != null && (consumed < minConsume || config.isAutoRead())) {
|
||||||
while ((consumed < minConsume) || config.isAutoRead()) {
|
Object msg = queue.poll();
|
||||||
msg = queue.poll();
|
if (msg == null) {
|
||||||
if (msg == null) {
|
break;
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
++consumed;
|
|
||||||
ctx.fireChannelRead(msg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We're firing a completion event every time one (or more)
|
++consumed;
|
||||||
// messages were consumed and the queue ended up being drained
|
ctx.fireChannelRead(msg);
|
||||||
// to an empty state.
|
|
||||||
if (queue.isEmpty()) {
|
|
||||||
queue.recycle();
|
|
||||||
queue = null;
|
|
||||||
|
|
||||||
if (consumed > 0) {
|
|
||||||
ctx.fireChannelReadComplete();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return consumed;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
// We're firing a completion event every time one (or more)
|
||||||
|
// messages were consumed and the queue ended up being drained
|
||||||
|
// to an empty state.
|
||||||
|
if (queue != null && queue.isEmpty()) {
|
||||||
|
queue.recycle();
|
||||||
|
queue = null;
|
||||||
|
|
||||||
|
if (consumed > 0) {
|
||||||
|
ctx.fireChannelReadComplete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return consumed;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -41,6 +41,7 @@ import java.net.SocketAddress;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Exchanger;
|
import java.util.concurrent.Exchanger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.*;
|
import static java.util.concurrent.TimeUnit.*;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@ -367,6 +368,56 @@ public class FlowControlHandlerTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReentranceNotCausesNPE() throws Throwable {
|
||||||
|
final Exchanger<Channel> peerRef = new Exchanger<Channel>();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(3);
|
||||||
|
final AtomicReference<Throwable> causeRef = new AtomicReference<Throwable>();
|
||||||
|
ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {
|
||||||
|
@Override
|
||||||
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
ctx.fireChannelActive();
|
||||||
|
peerRef.exchange(ctx.channel(), 1L, SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
|
latch.countDown();
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||||
|
causeRef.set(cause);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
FlowControlHandler flow = new FlowControlHandler();
|
||||||
|
Channel server = newServer(false, flow, handler);
|
||||||
|
Channel client = newClient(server.localAddress());
|
||||||
|
try {
|
||||||
|
// The client connection on the server side
|
||||||
|
Channel peer = peerRef.exchange(null, 1L, SECONDS);
|
||||||
|
|
||||||
|
// Write the message
|
||||||
|
client.writeAndFlush(newOneMessage())
|
||||||
|
.syncUninterruptibly();
|
||||||
|
|
||||||
|
// channelRead(1)
|
||||||
|
peer.read();
|
||||||
|
assertTrue(latch.await(1L, SECONDS));
|
||||||
|
assertTrue(flow.isQueueEmpty());
|
||||||
|
|
||||||
|
Throwable cause = causeRef.get();
|
||||||
|
if (cause != null) {
|
||||||
|
throw cause;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
client.close();
|
||||||
|
server.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a fictional message decoder. It decodes each {@code byte}
|
* This is a fictional message decoder. It decodes each {@code byte}
|
||||||
* into three strings.
|
* into three strings.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user