remove AbstractChannel#doPreClose
This commit is contained in:
parent
fabefba791
commit
241f856cc7
@ -415,6 +415,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
"Force-closing a channel whose registration task was unaccepted by an event loop: {}",
|
||||
AbstractChannel.this, t);
|
||||
closeForcibly();
|
||||
closeFuture.setClosed();
|
||||
promise.setFailure(t);
|
||||
}
|
||||
}
|
||||
@ -440,12 +441,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
} catch (Throwable t) {
|
||||
// Close the channel directly to avoid FD leak.
|
||||
closeForcibly();
|
||||
closeFuture.setClosed();
|
||||
if (!promise.tryFailure(t)) {
|
||||
logger.warn(
|
||||
"Tried to fail the registration promise, but it is complete already. " +
|
||||
"Swallowing the cause of the registration failure:", t);
|
||||
}
|
||||
closeFuture.setClosed();
|
||||
}
|
||||
}
|
||||
|
||||
@ -514,39 +515,40 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
return;
|
||||
}
|
||||
|
||||
boolean wasActive = isActive();
|
||||
if (closeFuture.setClosed()) {
|
||||
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
|
||||
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
|
||||
|
||||
try {
|
||||
doClose();
|
||||
promise.setSuccess();
|
||||
} catch (Throwable t) {
|
||||
promise.setFailure(t);
|
||||
}
|
||||
|
||||
// Fail all the queued messages
|
||||
try {
|
||||
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
||||
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
|
||||
} finally {
|
||||
|
||||
if (wasActive && !isActive()) {
|
||||
invokeLater(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
pipeline.fireChannelInactive();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
deregister(voidPromise());
|
||||
}
|
||||
} else {
|
||||
if (closeFuture.isSuccess()) {
|
||||
// Closed already.
|
||||
promise.setSuccess();
|
||||
}
|
||||
|
||||
boolean wasActive = isActive();
|
||||
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
|
||||
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
|
||||
|
||||
try {
|
||||
doClose();
|
||||
closeFuture.setClosed();
|
||||
promise.setSuccess();
|
||||
} catch (Throwable t) {
|
||||
promise.setFailure(t);
|
||||
}
|
||||
|
||||
// Fail all the queued messages
|
||||
try {
|
||||
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
||||
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
|
||||
} finally {
|
||||
|
||||
if (wasActive && !isActive()) {
|
||||
invokeLater(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
pipeline.fireChannelInactive();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
deregister(voidPromise());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -750,14 +752,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
*/
|
||||
protected abstract void doDisconnect() throws Exception;
|
||||
|
||||
/**
|
||||
* Will be called before the actual close operation will be performed. Sub-classes may override this as the default
|
||||
* is to do nothing.
|
||||
*/
|
||||
protected void doPreClose() throws Exception {
|
||||
// NOOP by default
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the {@link Channel}
|
||||
*/
|
||||
@ -834,11 +828,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
}
|
||||
|
||||
boolean setClosed() {
|
||||
try {
|
||||
doPreClose();
|
||||
} catch (Exception e) {
|
||||
logger.warn("doPreClose() raised an exception.", e);
|
||||
}
|
||||
return super.trySuccess();
|
||||
}
|
||||
}
|
||||
|
@ -199,13 +199,12 @@ public class LocalChannel extends AbstractChannel {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPreClose() throws Exception {
|
||||
protected void doClose() throws Exception {
|
||||
if (state > 2) {
|
||||
// Closed already
|
||||
return;
|
||||
}
|
||||
|
||||
// Update all internal state before the closeFuture is notified.
|
||||
if (localAddress != null) {
|
||||
if (parent() == null) {
|
||||
LocalChannelRegistry.unregister(localAddress);
|
||||
@ -213,10 +212,7 @@ public class LocalChannel extends AbstractChannel {
|
||||
localAddress = null;
|
||||
}
|
||||
state = 3;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws Exception {
|
||||
LocalChannel peer = this.peer;
|
||||
if (peer != null && peer.isActive()) {
|
||||
peer.unsafe().close(unsafe().voidPromise());
|
||||
|
@ -94,23 +94,17 @@ public class LocalServerChannel extends AbstractServerChannel {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPreClose() throws Exception {
|
||||
protected void doClose() throws Exception {
|
||||
if (state > 1) {
|
||||
// Closed already.
|
||||
return;
|
||||
}
|
||||
|
||||
// Update all internal state before the closeFuture is notified.
|
||||
LocalChannelRegistry.unregister(localAddress);
|
||||
localAddress = null;
|
||||
state = 2;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws Exception {
|
||||
// All internal state was updated already at doPreClose().
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Runnable doDeregister() throws Exception {
|
||||
((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
|
||||
|
Loading…
Reference in New Issue
Block a user