From eb4d317b9d64f3945a209804fec4c3fe695f4f9f Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Wed, 16 Nov 2016 14:03:57 -0800 Subject: [PATCH] Fix LocalChannel close sequence Motivation: LocalChannel attempts to close its peer socket when ever it is closed. However if the channels are on different EventLoops we may attempt to process events for the peer channel on the wrong EventLoop. Modifications: - Ensure the close process ensures we are on the correct thread before accessing data Result: More correct LocalChannel close code. --- .../io/netty/channel/local/LocalChannel.java | 84 +++++++++++-------- 1 file changed, 49 insertions(+), 35 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index a56aa698f8..980cc73f49 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -31,6 +31,8 @@ import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.ThrowableUtil; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.ConnectException; import java.net.SocketAddress; @@ -45,9 +47,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; * A {@link Channel} for the local transport. */ public class LocalChannel extends AbstractChannel { - - private enum State { OPEN, BOUND, CONNECTED, CLOSED } - + private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class); @SuppressWarnings({ "rawtypes" }) private static final AtomicReferenceFieldUpdater FINISH_READ_FUTURE_UPDATER; private static final ChannelMetadata METADATA = new ChannelMetadata(false); @@ -57,6 +57,8 @@ public class LocalChannel extends AbstractChannel { private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( new ClosedChannelException(), LocalChannel.class, "doClose()"); + private enum State { OPEN, BOUND, CONNECTED, CLOSED } + private final ChannelConfig config = new DefaultChannelConfig(this); // To further optimize this we could write our own SPSC queue. private final Queue inboundBuffer = PlatformDependent.newSpscQueue(); @@ -241,50 +243,57 @@ public class LocalChannel extends AbstractChannel { // channelRead. state = State.CLOSED; + // Preserve order of event and force a read operation now before the close operation is processed. + finishPeerRead(this); + ChannelPromise promise = connectPromise; if (promise != null) { // Use tryFailure() instead of setFailure() to avoid the race against cancel(). promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION); connectPromise = null; } - - // To preserve ordering of events we must process any pending reads - if (writeInProgress && peer != null) { - finishPeerRead(peer); - } } - if (peer != null && peer.isActive()) { + if (peer != null) { + this.peer = null; // Need to execute the close in the correct EventLoop (see https://github.com/netty/netty/issues/1777). // Also check if the registration was not done yet. In this case we submit the close to the EventLoop // to make sure its run after the registration completes (see https://github.com/netty/netty/issues/2144). - if (peer.eventLoop().inEventLoop() && !registerInProgress) { - doPeerClose(peer, peer.writeInProgress); + EventLoop peerEventLoop = peer.eventLoop(); + final boolean peerIsActive = peer.isActive(); + if (peerEventLoop.inEventLoop() && !registerInProgress) { + peer.tryClose(peerIsActive); } else { - // This value may change, and so we should save it before executing the Runnable. - final boolean peerWriteInProgress = peer.writeInProgress; try { - peer.eventLoop().execute(new Runnable() { + peerEventLoop.execute(new Runnable() { @Override public void run() { - doPeerClose(peer, peerWriteInProgress); + peer.tryClose(peerIsActive); } }); - } catch (RuntimeException e) { - // The peer close may attempt to drain this.inboundBuffers. If that fails make sure it is drained. + } catch (Throwable cause) { + logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!", + this, peer, cause); releaseInboundBuffers(); - throw e; + if (peerEventLoop.inEventLoop()) { + peer.releaseInboundBuffers(); + } else { + // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or rejects + // the close Runnable but give a best effort. + peer.close(); + } + PlatformDependent.throwException(cause); } } - this.peer = null; } } - private void doPeerClose(LocalChannel peer, boolean peerWriteInProgress) { - if (peerWriteInProgress) { - finishPeerRead0(this); + private void tryClose(boolean isActive) { + if (isActive) { + unsafe().close(unsafe().voidPromise()); + } else { + releaseInboundBuffers(); } - peer.unsafe().close(peer.unsafe().voidPromise()); } @Override @@ -325,9 +334,11 @@ public class LocalChannel extends AbstractChannel { } else { try { eventLoop().execute(readTask); - } catch (RuntimeException e) { - releaseInboundBuffers(); - throw e; + } catch (Throwable cause) { + logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause); + close(); + peer.close(); + PlatformDependent.throwException(cause); } } } @@ -402,19 +413,22 @@ public class LocalChannel extends AbstractChannel { } else { peer.eventLoop().execute(finishPeerReadTask); } - } catch (RuntimeException e) { - peer.releaseInboundBuffers(); - throw e; + } catch (Throwable cause) { + logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause); + close(); + peer.close(); + PlatformDependent.throwException(cause); } } private void releaseInboundBuffers() { - for (;;) { - Object o = inboundBuffer.poll(); - if (o == null) { - break; - } - ReferenceCountUtil.release(o); + if (readInProgress) { + return; + } + Queue inboundBuffer = this.inboundBuffer; + Object msg; + while ((msg = inboundBuffer.poll()) != null) { + ReferenceCountUtil.release(msg); } }