Fix LocalChannel close sequence

Motivation:
LocalChannel attempts to close its peer socket when ever it is closed. However if the channels are on different EventLoops we may attempt to process events for the peer channel on the wrong EventLoop.

Modifications:
- Ensure the close process ensures we are on the correct thread before accessing data

Result:
More correct LocalChannel close code.
This commit is contained in:
Scott Mitchell 2016-11-16 14:03:57 -08:00
parent 886a7aae46
commit eb4d317b9d

View File

@ -31,6 +31,8 @@ import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -45,9 +47,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
* A {@link Channel} for the local transport. * A {@link Channel} for the local transport.
*/ */
public class LocalChannel extends AbstractChannel { public class LocalChannel extends AbstractChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
private enum State { OPEN, BOUND, CONNECTED, CLOSED }
@SuppressWarnings({ "rawtypes" }) @SuppressWarnings({ "rawtypes" })
private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER; private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER;
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false);
@ -57,6 +57,8 @@ public class LocalChannel extends AbstractChannel {
private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), LocalChannel.class, "doClose()"); new ClosedChannelException(), LocalChannel.class, "doClose()");
private enum State { OPEN, BOUND, CONNECTED, CLOSED }
private final ChannelConfig config = new DefaultChannelConfig(this); private final ChannelConfig config = new DefaultChannelConfig(this);
// To further optimize this we could write our own SPSC queue. // To further optimize this we could write our own SPSC queue.
private final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue(); private final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
@ -241,50 +243,57 @@ public class LocalChannel extends AbstractChannel {
// channelRead. // channelRead.
state = State.CLOSED; state = State.CLOSED;
// Preserve order of event and force a read operation now before the close operation is processed.
finishPeerRead(this);
ChannelPromise promise = connectPromise; ChannelPromise promise = connectPromise;
if (promise != null) { if (promise != null) {
// Use tryFailure() instead of setFailure() to avoid the race against cancel(). // Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION); promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
connectPromise = null; connectPromise = null;
} }
// To preserve ordering of events we must process any pending reads
if (writeInProgress && peer != null) {
finishPeerRead(peer);
}
} }
if (peer != null && peer.isActive()) { if (peer != null) {
this.peer = null;
// Need to execute the close in the correct EventLoop (see https://github.com/netty/netty/issues/1777). // Need to execute the close in the correct EventLoop (see https://github.com/netty/netty/issues/1777).
// Also check if the registration was not done yet. In this case we submit the close to the EventLoop // Also check if the registration was not done yet. In this case we submit the close to the EventLoop
// to make sure its run after the registration completes (see https://github.com/netty/netty/issues/2144). // to make sure its run after the registration completes (see https://github.com/netty/netty/issues/2144).
if (peer.eventLoop().inEventLoop() && !registerInProgress) { EventLoop peerEventLoop = peer.eventLoop();
doPeerClose(peer, peer.writeInProgress); final boolean peerIsActive = peer.isActive();
if (peerEventLoop.inEventLoop() && !registerInProgress) {
peer.tryClose(peerIsActive);
} else { } else {
// This value may change, and so we should save it before executing the Runnable.
final boolean peerWriteInProgress = peer.writeInProgress;
try { try {
peer.eventLoop().execute(new Runnable() { peerEventLoop.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
doPeerClose(peer, peerWriteInProgress); peer.tryClose(peerIsActive);
} }
}); });
} catch (RuntimeException e) { } catch (Throwable cause) {
// The peer close may attempt to drain this.inboundBuffers. If that fails make sure it is drained. logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
this, peer, cause);
releaseInboundBuffers(); releaseInboundBuffers();
throw e; if (peerEventLoop.inEventLoop()) {
peer.releaseInboundBuffers();
} else {
// inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or rejects
// the close Runnable but give a best effort.
peer.close();
}
PlatformDependent.throwException(cause);
} }
} }
this.peer = null;
} }
} }
private void doPeerClose(LocalChannel peer, boolean peerWriteInProgress) { private void tryClose(boolean isActive) {
if (peerWriteInProgress) { if (isActive) {
finishPeerRead0(this); unsafe().close(unsafe().voidPromise());
} else {
releaseInboundBuffers();
} }
peer.unsafe().close(peer.unsafe().voidPromise());
} }
@Override @Override
@ -325,9 +334,11 @@ public class LocalChannel extends AbstractChannel {
} else { } else {
try { try {
eventLoop().execute(readTask); eventLoop().execute(readTask);
} catch (RuntimeException e) { } catch (Throwable cause) {
releaseInboundBuffers(); logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
throw e; close();
peer.close();
PlatformDependent.throwException(cause);
} }
} }
} }
@ -402,19 +413,22 @@ public class LocalChannel extends AbstractChannel {
} else { } else {
peer.eventLoop().execute(finishPeerReadTask); peer.eventLoop().execute(finishPeerReadTask);
} }
} catch (RuntimeException e) { } catch (Throwable cause) {
peer.releaseInboundBuffers(); logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
throw e; close();
peer.close();
PlatformDependent.throwException(cause);
} }
} }
private void releaseInboundBuffers() { private void releaseInboundBuffers() {
for (;;) { if (readInProgress) {
Object o = inboundBuffer.poll(); return;
if (o == null) { }
break; Queue<Object> inboundBuffer = this.inboundBuffer;
} Object msg;
ReferenceCountUtil.release(o); while ((msg = inboundBuffer.poll()) != null) {
ReferenceCountUtil.release(msg);
} }
} }