Make sure ChannelFutureAggregator is thread-safe and only hold the lock

as short as possible. This also change it to lazy init the HashSet that
holds the ChannelFuture's. See #90
This commit is contained in:
norman 2011-11-30 13:45:51 +01:00
parent da3a52778b
commit d2278a7d53

View File

@ -33,20 +33,24 @@ public class ChannelFutureAggregator implements ChannelFutureListener {
private final ChannelFuture aggregateFuture;
private final Set<ChannelFuture> pendingFutures;
private Set<ChannelFuture> pendingFutures;
public ChannelFutureAggregator(ChannelFuture aggregateFuture) {
this.aggregateFuture = aggregateFuture;
pendingFutures = new HashSet<ChannelFuture>();
}
public void addFuture(ChannelFuture future) {
pendingFutures.add(future);
synchronized(this) {
if (pendingFutures == null) {
pendingFutures = new HashSet<ChannelFuture>();
}
pendingFutures.add(future);
}
future.addListener(this);
}
@Override
public synchronized void operationComplete(ChannelFuture future)
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isCancelled()) {
// TODO: what should the correct behaviour be when a fragment is cancelled?
@ -54,17 +58,24 @@ public class ChannelFutureAggregator implements ChannelFutureListener {
return;
}
pendingFutures.remove(future);
if (!future.isSuccess()) {
aggregateFuture.setFailure(future.getCause());
for (ChannelFuture pendingFuture: pendingFutures) {
pendingFuture.cancel();
synchronized (this) {
if (pendingFutures == null) {
aggregateFuture.setSuccess();
} else {
pendingFutures.remove(future);
if (!future.isSuccess()) {
aggregateFuture.setFailure(future.getCause());
for (ChannelFuture pendingFuture: pendingFutures) {
pendingFuture.cancel();
}
} else {
if (pendingFutures.isEmpty()) {
aggregateFuture.setSuccess();
}
}
}
return;
}
if (pendingFutures.isEmpty()) {
aggregateFuture.setSuccess();
}
}
}