Fix race condition of DefaultChannelGroup by introducing a closed flag.
Motivation: Doc of ChannelGroup says, that it can be used to manage server and child channels at once. However, in DefaultChannelGroup, there is a race condition. When a server channel accepts a child, it schedules its registration on an event loop, which takes some time. If the ChannelGroup, which is supposed to close server and child channels at once, is closed after the child channel has been scheduled for registration and before this registration actually happens, this child channel is not closed and remains connected. This could lead to connection leaks. Modifications: To fix this, the DefaultChannelGroup is changed to has a closed flag. This flag is set to true, just before the close() method is actually closing channels. The add() method checks after adding a new channel, if this flag has been set to true. If yes, the new channel is closed. If not, we have the guarantee, that this channel will be closed by the ChannelGroup, because setting the closed flag to true happens-before closing any channels. This behaviour can be activated by two new constructors. The old constructors are still there and behave like before. Therefore, no existing code should be affected directly. Result: If activating this feature, the DefaultChannelGroup can be used, for managing server and child channels at once. But this activating this feature means also, that a ChannelGroup cannot be reused after calling close().
This commit is contained in:
parent
b960939ea0
commit
93d2e86ed0
@ -50,13 +50,15 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
|
||||
remove(future.channel());
|
||||
}
|
||||
};
|
||||
private final boolean stayClosed;
|
||||
private volatile boolean closed;
|
||||
|
||||
/**
|
||||
* Creates a new group with a generated name and the provided {@link EventExecutor} to notify the
|
||||
* {@link ChannelGroupFuture}s.
|
||||
*/
|
||||
public DefaultChannelGroup(EventExecutor executor) {
|
||||
this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor);
|
||||
this(executor, false);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -65,11 +67,33 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
|
||||
* duplicate check is done against group names.
|
||||
*/
|
||||
public DefaultChannelGroup(String name, EventExecutor executor) {
|
||||
this(name, executor, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new group with a generated name and the provided {@link EventExecutor} to notify the
|
||||
* {@link ChannelGroupFuture}s. {@code stayClosed} defines whether or not, this group can be closed
|
||||
* more than once. Adding channels to a closed group will immediately close them, too. This makes it
|
||||
* easy, to shutdown server and child channels at once.
|
||||
*/
|
||||
public DefaultChannelGroup(EventExecutor executor, boolean stayClosed) {
|
||||
this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor, stayClosed);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new group with the specified {@code name} and {@link EventExecutor} to notify the
|
||||
* {@link ChannelGroupFuture}s. {@code stayClosed} defines whether or not, this group can be closed
|
||||
* more than once. Adding channels to a closed group will immediately close them, too. This makes it
|
||||
* easy, to shutdown server and child channels at once. Please note that different groups can have
|
||||
* the same name, which means no duplicate check is done against group names.
|
||||
*/
|
||||
public DefaultChannelGroup(String name, EventExecutor executor, boolean stayClosed) {
|
||||
if (name == null) {
|
||||
throw new NullPointerException("name");
|
||||
}
|
||||
this.name = name;
|
||||
this.executor = executor;
|
||||
this.stayClosed = stayClosed;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -110,6 +134,23 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
|
||||
if (added) {
|
||||
channel.closeFuture().addListener(remover);
|
||||
}
|
||||
|
||||
if (stayClosed && closed) {
|
||||
|
||||
// First add channel, than check if closed.
|
||||
// Seems inefficient at first, but this way a volatile
|
||||
// gives us enough synchronization to be thread-safe.
|
||||
//
|
||||
// If true: Close right away.
|
||||
// (Might be closed a second time by ChannelGroup.close(), but this is ok)
|
||||
//
|
||||
// If false: Channel will definitely be closed by the ChannelGroup.
|
||||
// (Because closed=true always happens-before ChannelGroup.close())
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/4020
|
||||
channel.close();
|
||||
}
|
||||
|
||||
return added;
|
||||
}
|
||||
|
||||
@ -261,6 +302,16 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
|
||||
Map<Channel, ChannelFuture> futures =
|
||||
new LinkedHashMap<Channel, ChannelFuture>(size());
|
||||
|
||||
if (stayClosed) {
|
||||
// It is important to set the closed to true, before closing channels.
|
||||
// Our invariants are:
|
||||
// closed=true happens-before ChannelGroup.close()
|
||||
// ChannelGroup.add() happens-before checking closed==true
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/4020
|
||||
closed = true;
|
||||
}
|
||||
|
||||
for (Channel c: serverChannels) {
|
||||
if (matcher.matches(c)) {
|
||||
futures.put(c, c.close());
|
||||
|
Loading…
Reference in New Issue
Block a user