Fix a race condition where local channel's closeFuture is notified early
- Added AbstractChannel.doPreClose() to allow a transport to perform a task before closeFuture is notified
This commit is contained in:
parent
632542e0cd
commit
cf0259661e
@ -708,6 +708,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
protected abstract Runnable doRegister() throws Exception;
|
protected abstract Runnable doRegister() throws Exception;
|
||||||
protected abstract void doBind(SocketAddress localAddress) throws Exception;
|
protected abstract void doBind(SocketAddress localAddress) throws Exception;
|
||||||
protected abstract void doDisconnect() throws Exception;
|
protected abstract void doDisconnect() throws Exception;
|
||||||
|
protected void doPreClose() throws Exception {
|
||||||
|
// NOOP by default
|
||||||
|
}
|
||||||
|
|
||||||
protected abstract void doClose() throws Exception;
|
protected abstract void doClose() throws Exception;
|
||||||
protected abstract void doDeregister() throws Exception;
|
protected abstract void doDeregister() throws Exception;
|
||||||
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
|
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
|
||||||
@ -799,7 +803,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class CloseFuture extends DefaultChannelFuture implements ChannelFuture.Unsafe {
|
private final class CloseFuture extends DefaultChannelFuture implements ChannelFuture.Unsafe {
|
||||||
|
|
||||||
CloseFuture(AbstractChannel ch) {
|
CloseFuture(AbstractChannel ch) {
|
||||||
super(ch, false);
|
super(ch, false);
|
||||||
@ -816,6 +820,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
}
|
}
|
||||||
|
|
||||||
boolean setClosed() {
|
boolean setClosed() {
|
||||||
|
try {
|
||||||
|
doPreClose();
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("doPreClose() raised an exception.", e);
|
||||||
|
}
|
||||||
return super.setSuccess();
|
return super.setSuccess();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,10 +17,10 @@ package io.netty.channel.local;
|
|||||||
|
|
||||||
import io.netty.channel.AbstractChannel;
|
import io.netty.channel.AbstractChannel;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelBufferType;
|
||||||
import io.netty.channel.ChannelConfig;
|
import io.netty.channel.ChannelConfig;
|
||||||
import io.netty.channel.ChannelException;
|
import io.netty.channel.ChannelException;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelBufferType;
|
|
||||||
import io.netty.channel.DefaultChannelConfig;
|
import io.netty.channel.DefaultChannelConfig;
|
||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
import io.netty.channel.SingleThreadEventLoop;
|
import io.netty.channel.SingleThreadEventLoop;
|
||||||
@ -171,17 +171,22 @@ public class LocalChannel extends AbstractChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doClose() throws Exception {
|
protected void doPreClose() throws Exception {
|
||||||
if (state > 2) {
|
if (state > 2) {
|
||||||
// Closed already
|
// Closed already
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update all internal state before the closeFuture is notified.
|
||||||
if (parent() == null) {
|
if (parent() == null) {
|
||||||
LocalChannelRegistry.unregister(localAddress);
|
LocalChannelRegistry.unregister(localAddress);
|
||||||
}
|
}
|
||||||
localAddress = null;
|
localAddress = null;
|
||||||
state = 3;
|
state = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doClose() throws Exception {
|
||||||
if (peer.isActive()) {
|
if (peer.isActive()) {
|
||||||
peer.unsafe().close(peer.unsafe().voidFuture());
|
peer.unsafe().close(peer.unsafe().voidFuture());
|
||||||
peer = null;
|
peer = null;
|
||||||
|
@ -96,17 +96,23 @@ public class LocalServerChannel extends AbstractServerChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doClose() throws Exception {
|
protected void doPreClose() throws Exception {
|
||||||
if (state > 1) {
|
if (state > 1) {
|
||||||
// Closed already.
|
// Closed already.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update all internal state before the closeFuture is notified.
|
||||||
LocalChannelRegistry.unregister(localAddress);
|
LocalChannelRegistry.unregister(localAddress);
|
||||||
localAddress = null;
|
localAddress = null;
|
||||||
state = 2;
|
state = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doClose() throws Exception {
|
||||||
|
// All internal state was updated already at doPreClose().
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doDeregister() throws Exception {
|
protected void doDeregister() throws Exception {
|
||||||
((SingleThreadEventLoop) eventLoop()).removeShutdownHook(shutdownHook);
|
((SingleThreadEventLoop) eventLoop()).removeShutdownHook(shutdownHook);
|
||||||
|
Loading…
Reference in New Issue
Block a user