Fix buffer leak in local transport when a close triggers the close of a remote peer and there are still messages in the inbound buffer.

Motivation:

We need to release all the buffers that may be put into our inbound queue since we closed the Channel to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which  means even if the promise was notified before its not really guaranteed that the "remote peer" will see the buffer at all.

Modifications:

Ensure we release all buffers in the inbound buffer if a doClose() is called.

Result:

No more leaks.
This commit is contained in:
Norman Maurer 2017-04-20 08:08:40 +02:00
parent 4dd6c14ba2
commit c663a94359
2 changed files with 126 additions and 53 deletions

View File

@ -62,7 +62,7 @@ public class LocalChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig(this); private final ChannelConfig config = new DefaultChannelConfig(this);
// To further optimize this we could write our own SPSC queue. // To further optimize this we could write our own SPSC queue.
private final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue(); final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
private final Runnable readTask = new Runnable() { private final Runnable readTask = new Runnable() {
@Override @Override
public void run() { public void run() {
@ -220,7 +220,9 @@ public class LocalChannel extends AbstractChannel {
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
final LocalChannel peer = this.peer; final LocalChannel peer = this.peer;
if (state != State.CLOSED) { State oldState = state;
try {
if (oldState != State.CLOSED) {
// Update all internal state before the closeFuture is notified. // Update all internal state before the closeFuture is notified.
if (localAddress != null) { if (localAddress != null) {
if (parent() == null) { if (parent() == null) {
@ -248,7 +250,8 @@ public class LocalChannel extends AbstractChannel {
this.peer = null; this.peer = null;
// Need to execute the close in the correct EventLoop (see https://github.com/netty/netty/issues/1777). // Need to execute the close in the correct EventLoop (see https://github.com/netty/netty/issues/1777).
// Also check if the registration was not done yet. In this case we submit the close to the EventLoop // Also check if the registration was not done yet. In this case we submit the close to the EventLoop
// to make sure its run after the registration completes (see https://github.com/netty/netty/issues/2144). // to make sure its run after the registration completes
// (see https://github.com/netty/netty/issues/2144).
EventLoop peerEventLoop = peer.eventLoop(); EventLoop peerEventLoop = peer.eventLoop();
final boolean peerIsActive = peer.isActive(); final boolean peerIsActive = peer.isActive();
if (peerEventLoop.inEventLoop() && !registerInProgress) { if (peerEventLoop.inEventLoop() && !registerInProgress) {
@ -264,18 +267,27 @@ public class LocalChannel extends AbstractChannel {
} catch (Throwable cause) { } catch (Throwable cause) {
logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!", logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
this, peer, cause); this, peer, cause);
releaseInboundBuffers();
if (peerEventLoop.inEventLoop()) { if (peerEventLoop.inEventLoop()) {
peer.releaseInboundBuffers(); peer.releaseInboundBuffers();
} else { } else {
// inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or rejects // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
// the close Runnable but give a best effort. // rejects the close Runnable but give a best effort.
peer.close(); peer.close();
} }
PlatformDependent.throwException(cause); PlatformDependent.throwException(cause);
} }
} }
} }
} finally {
// Release all buffers if the Channel was already registered in the past and if it was not closed before.
if (oldState != null && oldState != State.CLOSED) {
// We need to release all the buffers that may be put into our inbound queue since we closed the Channel
// to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which
// means even if the promise was notified before its not really guaranteed that the "remote peer" will
// see the buffer at all.
releaseInboundBuffers();
}
}
} }
private void tryClose(boolean isActive) { private void tryClose(boolean isActive) {

View File

@ -896,4 +896,65 @@ public class LocalChannelTest {
ReferenceCountUtil.safeRelease(msg); ReferenceCountUtil.safeRelease(msg);
} }
} }
@Test
public void testNotLeakBuffersWhenCloseByRemotePeer() throws Exception {
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
cb.group(sharedGroup)
.channel(LocalChannel.class)
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(ctx.alloc().buffer().writeZero(100));
}
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// Just drop the buffer
}
});
sb.group(sharedGroup)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
while (buffer.isReadable()) {
// Fill the ChannelOutboundBuffer with multiple buffers
ctx.write(buffer.readRetainedSlice(1));
}
// Flush and so transfer the written buffers to the inboundBuffer of the remote peer.
// After this point the remote peer is responsible to release all the buffers.
ctx.flush();
// This close call will trigger the remote peer close as well.
ctx.close();
}
});
}
});
Channel sc = null;
LocalChannel cc = null;
try {
// Start server
sc = sb.bind(TEST_ADDRESS).sync().channel();
// Connect to the server
cc = (LocalChannel) cb.connect(sc.localAddress()).sync().channel();
// Close the channel
closeChannel(cc);
assertTrue(cc.inboundBuffer.isEmpty());
closeChannel(sc);
} finally {
closeChannel(cc);
closeChannel(sc);
}
}
} }