Revert "remove AbstractChannel#doPreClose"

This reverts commit 241f856cc721dbf786321a94c987c5026eea5412.
This commit is contained in:
Norman Maurer 2013-07-26 11:23:11 +02:00
parent 241f856cc7
commit 5f363e8ade
3 changed files with 54 additions and 33 deletions

View File

@ -415,7 +415,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
"Force-closing a channel whose registration task was unaccepted by an event loop: {}", "Force-closing a channel whose registration task was unaccepted by an event loop: {}",
AbstractChannel.this, t); AbstractChannel.this, t);
closeForcibly(); closeForcibly();
closeFuture.setClosed();
promise.setFailure(t); promise.setFailure(t);
} }
} }
@ -441,12 +440,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} catch (Throwable t) { } catch (Throwable t) {
// Close the channel directly to avoid FD leak. // Close the channel directly to avoid FD leak.
closeForcibly(); closeForcibly();
closeFuture.setClosed();
if (!promise.tryFailure(t)) { if (!promise.tryFailure(t)) {
logger.warn( logger.warn(
"Tried to fail the registration promise, but it is complete already. " + "Tried to fail the registration promise, but it is complete already. " +
"Swallowing the cause of the registration failure:", t); "Swallowing the cause of the registration failure:", t);
} }
closeFuture.setClosed();
} }
} }
@ -515,39 +514,38 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return; return;
} }
if (closeFuture.isSuccess()) {
// Closed already.
promise.setSuccess();
}
boolean wasActive = isActive(); boolean wasActive = isActive();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (closeFuture.setClosed()) {
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
try { try {
doClose(); doClose();
closeFuture.setClosed(); promise.setSuccess();
promise.setSuccess(); } catch (Throwable t) {
} catch (Throwable t) { promise.setFailure(t);
promise.setFailure(t);
}
// Fail all the queued messages
try {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
} finally {
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
} }
deregister(voidPromise()); // Fail all the queued messages
try {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
} finally {
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
deregister(voidPromise());
}
} else {
// Closed already.
promise.setSuccess();
} }
} }
@ -752,6 +750,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/ */
protected abstract void doDisconnect() throws Exception; protected abstract void doDisconnect() throws Exception;
/**
* Will be called before the actual close operation will be performed. Sub-classes may override this as the default
* is to do nothing.
*/
protected void doPreClose() throws Exception {
// NOOP by default
}
/** /**
* Close the {@link Channel} * Close the {@link Channel}
*/ */
@ -828,6 +834,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
boolean setClosed() { boolean setClosed() {
try {
doPreClose();
} catch (Exception e) {
logger.warn("doPreClose() raised an exception.", e);
}
return super.trySuccess(); return super.trySuccess();
} }
} }

View File

@ -199,12 +199,13 @@ public class LocalChannel extends AbstractChannel {
} }
@Override @Override
protected void doClose() throws Exception { protected void doPreClose() throws Exception {
if (state > 2) { if (state > 2) {
// Closed already // Closed already
return; return;
} }
// Update all internal state before the closeFuture is notified.
if (localAddress != null) { if (localAddress != null) {
if (parent() == null) { if (parent() == null) {
LocalChannelRegistry.unregister(localAddress); LocalChannelRegistry.unregister(localAddress);
@ -212,7 +213,10 @@ public class LocalChannel extends AbstractChannel {
localAddress = null; localAddress = null;
} }
state = 3; state = 3;
}
@Override
protected void doClose() throws Exception {
LocalChannel peer = this.peer; LocalChannel peer = this.peer;
if (peer != null && peer.isActive()) { if (peer != null && peer.isActive()) {
peer.unsafe().close(unsafe().voidPromise()); peer.unsafe().close(unsafe().voidPromise());

View File

@ -94,17 +94,23 @@ public class LocalServerChannel extends AbstractServerChannel {
} }
@Override @Override
protected void doClose() throws Exception { protected void doPreClose() throws Exception {
if (state > 1) { if (state > 1) {
// Closed already. // Closed already.
return; return;
} }
// Update all internal state before the closeFuture is notified.
LocalChannelRegistry.unregister(localAddress); LocalChannelRegistry.unregister(localAddress);
localAddress = null; localAddress = null;
state = 2; state = 2;
} }
@Override
protected void doClose() throws Exception {
// All internal state was updated already at doPreClose().
}
@Override @Override
protected Runnable doDeregister() throws Exception { protected Runnable doDeregister() throws Exception {
((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook); ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);