diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 638207b5b3..4f544bc67a 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -415,7 +415,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha "Force-closing a channel whose registration task was unaccepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); - closeFuture.setClosed(); promise.setFailure(t); } } @@ -441,12 +440,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); - closeFuture.setClosed(); if (!promise.tryFailure(t)) { logger.warn( "Tried to fail the registration promise, but it is complete already. " + "Swallowing the cause of the registration failure:", t); } + closeFuture.setClosed(); } } @@ -515,39 +514,38 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return; } - if (closeFuture.isSuccess()) { - // Closed already. - promise.setSuccess(); - } - boolean wasActive = isActive(); - ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; - this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. + if (closeFuture.setClosed()) { + ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; + this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. - try { - doClose(); - closeFuture.setClosed(); - promise.setSuccess(); - } catch (Throwable t) { - promise.setFailure(t); - } - - // Fail all the queued messages - try { - outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); - outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION); - } finally { - - if (wasActive && !isActive()) { - invokeLater(new Runnable() { - @Override - public void run() { - pipeline.fireChannelInactive(); - } - }); + try { + doClose(); + promise.setSuccess(); + } catch (Throwable t) { + promise.setFailure(t); } - deregister(voidPromise()); + // Fail all the queued messages + try { + outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); + outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION); + } finally { + + if (wasActive && !isActive()) { + invokeLater(new Runnable() { + @Override + public void run() { + pipeline.fireChannelInactive(); + } + }); + } + + deregister(voidPromise()); + } + } else { + // Closed already. + promise.setSuccess(); } } @@ -752,6 +750,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha */ protected abstract void doDisconnect() throws Exception; + /** + * Will be called before the actual close operation will be performed. Sub-classes may override this as the default + * is to do nothing. + */ + protected void doPreClose() throws Exception { + // NOOP by default + } + /** * Close the {@link Channel} */ @@ -828,6 +834,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } boolean setClosed() { + try { + doPreClose(); + } catch (Exception e) { + logger.warn("doPreClose() raised an exception.", e); + } return super.trySuccess(); } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 13b3c57abb..cc4acb2050 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -199,12 +199,13 @@ public class LocalChannel extends AbstractChannel { } @Override - protected void doClose() throws Exception { + protected void doPreClose() throws Exception { if (state > 2) { // Closed already return; } + // Update all internal state before the closeFuture is notified. if (localAddress != null) { if (parent() == null) { LocalChannelRegistry.unregister(localAddress); @@ -212,7 +213,10 @@ public class LocalChannel extends AbstractChannel { localAddress = null; } state = 3; + } + @Override + protected void doClose() throws Exception { LocalChannel peer = this.peer; if (peer != null && peer.isActive()) { peer.unsafe().close(unsafe().voidPromise()); diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java index fc6c2d031e..7f279cf440 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java @@ -94,17 +94,23 @@ public class LocalServerChannel extends AbstractServerChannel { } @Override - protected void doClose() throws Exception { + protected void doPreClose() throws Exception { if (state > 1) { // Closed already. return; } + // Update all internal state before the closeFuture is notified. LocalChannelRegistry.unregister(localAddress); localAddress = null; state = 2; } + @Override + protected void doClose() throws Exception { + // All internal state was updated already at doPreClose(). + } + @Override protected Runnable doDeregister() throws Exception { ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);