From d2278a7d53b14680f56fbe37921c613ffeedabe1 Mon Sep 17 00:00:00 2001 From: norman Date: Wed, 30 Nov 2011 13:45:51 +0100 Subject: [PATCH] Make sure ChannelFutureAggregator is thread-safe and only hold the lock as short as possible. This also change it to lazy init the HashSet that holds the ChannelFuture's. See #90 --- .../channel/ChannelFutureAggregator.java | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/ChannelFutureAggregator.java b/src/main/java/org/jboss/netty/channel/ChannelFutureAggregator.java index bf5c5a3677..54f2801070 100644 --- a/src/main/java/org/jboss/netty/channel/ChannelFutureAggregator.java +++ b/src/main/java/org/jboss/netty/channel/ChannelFutureAggregator.java @@ -33,20 +33,24 @@ public class ChannelFutureAggregator implements ChannelFutureListener { private final ChannelFuture aggregateFuture; - private final Set pendingFutures; + private Set pendingFutures; public ChannelFutureAggregator(ChannelFuture aggregateFuture) { this.aggregateFuture = aggregateFuture; - pendingFutures = new HashSet(); } public void addFuture(ChannelFuture future) { - pendingFutures.add(future); + synchronized(this) { + if (pendingFutures == null) { + pendingFutures = new HashSet(); + } + pendingFutures.add(future); + } future.addListener(this); } @Override - public synchronized void operationComplete(ChannelFuture future) + public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { // TODO: what should the correct behaviour be when a fragment is cancelled? @@ -54,17 +58,24 @@ public class ChannelFutureAggregator implements ChannelFutureListener { return; } - pendingFutures.remove(future); - if (!future.isSuccess()) { - aggregateFuture.setFailure(future.getCause()); - for (ChannelFuture pendingFuture: pendingFutures) { - pendingFuture.cancel(); + synchronized (this) { + if (pendingFutures == null) { + aggregateFuture.setSuccess(); + } else { + pendingFutures.remove(future); + if (!future.isSuccess()) { + aggregateFuture.setFailure(future.getCause()); + for (ChannelFuture pendingFuture: pendingFutures) { + pendingFuture.cancel(); + } + } else { + if (pendingFutures.isEmpty()) { + aggregateFuture.setSuccess(); + } + } } - return; - } - if (pendingFutures.isEmpty()) { - aggregateFuture.setSuccess(); } + } } \ No newline at end of file