Fixed issue: NETTY-157 Channel.close() does not trigger a channelUnbound event in some transports
* Made sure the specified future is not marked as done before setClosed() is called. The specified future can be the closeFuture of the channel, consequently making setClosed() always fail.
This commit is contained in:
parent
f0a96e7417
commit
55cf5d67af
@ -88,7 +88,6 @@ final class DefaultLocalChannel extends AbstractChannel implements LocalChannel
|
||||
try {
|
||||
// Close the self.
|
||||
if (!setClosed()) {
|
||||
future.setSuccess();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -115,6 +114,7 @@ final class DefaultLocalChannel extends AbstractChannel implements LocalChannel
|
||||
}
|
||||
fireChannelClosed(pairedChannel);
|
||||
} finally {
|
||||
future.setSuccess();
|
||||
if (localAddress != null) {
|
||||
LocalChannelRegistry.unregister(localAddress);
|
||||
}
|
||||
|
@ -136,15 +136,22 @@ final class LocalServerChannelSink extends AbstractChannelSink {
|
||||
}
|
||||
|
||||
private void close(DefaultLocalServerChannel channel, ChannelFuture future) {
|
||||
future.setSuccess();
|
||||
if (channel.setClosed()) {
|
||||
LocalAddress localAddress = channel.localAddress;
|
||||
if (channel.bound.compareAndSet(true, false)) {
|
||||
channel.localAddress = null;
|
||||
LocalChannelRegistry.unregister(localAddress);
|
||||
fireChannelUnbound(channel);
|
||||
try {
|
||||
if (channel.setClosed()) {
|
||||
future.setSuccess();
|
||||
LocalAddress localAddress = channel.localAddress;
|
||||
if (channel.bound.compareAndSet(true, false)) {
|
||||
channel.localAddress = null;
|
||||
LocalChannelRegistry.unregister(localAddress);
|
||||
fireChannelUnbound(channel);
|
||||
}
|
||||
fireChannelClosed(channel);
|
||||
} else {
|
||||
future.setSuccess();
|
||||
}
|
||||
fireChannelClosed(channel);
|
||||
} catch (Throwable t) {
|
||||
future.setFailure(t);
|
||||
fireExceptionCaught(channel, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -142,8 +142,8 @@ final class HttpTunnelWorker implements Runnable {
|
||||
boolean bound = channel.isBound();
|
||||
try {
|
||||
channel.closeSocket();
|
||||
future.setSuccess();
|
||||
if (channel.setClosed()) {
|
||||
future.setSuccess();
|
||||
if (connected) {
|
||||
// Notify the worker so it stops reading.
|
||||
Thread currentThread = Thread.currentThread();
|
||||
@ -157,6 +157,8 @@ final class HttpTunnelWorker implements Runnable {
|
||||
fireChannelUnbound(channel);
|
||||
}
|
||||
fireChannelClosed(channel);
|
||||
} else {
|
||||
future.setSuccess();
|
||||
}
|
||||
}
|
||||
catch (Throwable t) {
|
||||
|
@ -191,8 +191,8 @@ class OioDatagramWorker implements Runnable {
|
||||
boolean bound = channel.isBound();
|
||||
try {
|
||||
channel.socket.close();
|
||||
future.setSuccess();
|
||||
if (channel.setClosed()) {
|
||||
future.setSuccess();
|
||||
if (connected) {
|
||||
// Notify the worker so it stops reading.
|
||||
Thread currentThread = Thread.currentThread();
|
||||
@ -206,6 +206,8 @@ class OioDatagramWorker implements Runnable {
|
||||
fireChannelUnbound(channel);
|
||||
}
|
||||
fireChannelClosed(channel);
|
||||
} else {
|
||||
future.setSuccess();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setFailure(t);
|
||||
|
@ -171,12 +171,14 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
|
||||
boolean bound = channel.isBound();
|
||||
try {
|
||||
channel.socket.close();
|
||||
future.setSuccess();
|
||||
if (channel.setClosed()) {
|
||||
future.setSuccess();
|
||||
if (bound) {
|
||||
fireChannelUnbound(channel);
|
||||
}
|
||||
fireChannelClosed(channel);
|
||||
} else {
|
||||
future.setSuccess();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setFailure(t);
|
||||
|
@ -180,8 +180,8 @@ class OioWorker implements Runnable {
|
||||
boolean bound = channel.isBound();
|
||||
try {
|
||||
channel.socket.close();
|
||||
future.setSuccess();
|
||||
if (channel.setClosed()) {
|
||||
future.setSuccess();
|
||||
if (connected) {
|
||||
// Notify the worker so it stops reading.
|
||||
Thread currentThread = Thread.currentThread();
|
||||
@ -195,6 +195,8 @@ class OioWorker implements Runnable {
|
||||
fireChannelUnbound(channel);
|
||||
}
|
||||
fireChannelClosed(channel);
|
||||
} else {
|
||||
future.setSuccess();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setFailure(t);
|
||||
|
@ -183,18 +183,24 @@ class BaseXnioChannel extends AbstractChannel implements XnioChannel {
|
||||
return;
|
||||
}
|
||||
|
||||
IoUtils.safeClose(xnioChannel);
|
||||
xnioChannel = null;
|
||||
XnioChannelRegistry.unregisterChannelMapping(this);
|
||||
try {
|
||||
IoUtils.safeClose(xnioChannel);
|
||||
xnioChannel = null;
|
||||
XnioChannelRegistry.unregisterChannelMapping(this);
|
||||
|
||||
if (remoteAddress != null) {
|
||||
fireChannelDisconnected(this);
|
||||
}
|
||||
if (localAddress != null) {
|
||||
fireChannelUnbound(this);
|
||||
}
|
||||
future.setSuccess();
|
||||
if (remoteAddress != null) {
|
||||
fireChannelDisconnected(this);
|
||||
}
|
||||
if (localAddress != null) {
|
||||
fireChannelUnbound(this);
|
||||
}
|
||||
|
||||
fireChannelClosed(this);
|
||||
fireChannelClosed(this);
|
||||
} catch (Throwable t) {
|
||||
future.setFailure(t);
|
||||
fireExceptionCaught(this, t);
|
||||
}
|
||||
}
|
||||
|
||||
private final class WriteBuffer extends LinkedTransferQueue<MessageEvent> {
|
||||
|
@ -126,8 +126,8 @@ final class DefaultXnioServerChannel extends BaseXnioChannel implements XnioServ
|
||||
SocketAddress localAddress = getLocalAddress();
|
||||
boolean bound = localAddress != null;
|
||||
try {
|
||||
future.setSuccess();
|
||||
if (setClosed()) {
|
||||
future.setSuccess();
|
||||
synchronized (bindLock) {
|
||||
IoUtils.safeClose(xnioChannel);
|
||||
XnioChannelRegistry.unregisterServerChannel(localAddress);
|
||||
@ -138,6 +138,8 @@ final class DefaultXnioServerChannel extends BaseXnioChannel implements XnioServ
|
||||
fireChannelUnbound(this);
|
||||
}
|
||||
fireChannelClosed(this);
|
||||
} else {
|
||||
future.setSuccess();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setFailure(t);
|
||||
|
Loading…
Reference in New Issue
Block a user