diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseAggregator.java b/common/src/main/java/io/netty/util/concurrent/PromiseAggregator.java new file mode 100644 index 0000000000..62fb29e967 --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/PromiseAggregator.java @@ -0,0 +1,111 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.concurrent; + +import java.util.LinkedHashSet; +import java.util.Set; + +/** + * {@link GenericFutureListener} implementation which consolidates multiple {@link Future}s + * into one, by listening to individual {@link Future}s and producing an aggregated result + * (success/failure) when all {@link Future}s have completed. + * + * @param V the type of value returned by the {@link Future} + * @param F the type of {@link Future} + */ +public class PromiseAggregator> implements GenericFutureListener { + + private final Promise aggregatePromise; + private final boolean failPending; + private Set> pendingPromises; + + /** + * Creates a new instance. + * + * @param aggregatePromise the {@link Promise} to notify + * @param failPending {@code true} to fail pending promises, false to leave them unaffected + */ + public PromiseAggregator(Promise aggregatePromise, boolean failPending) { + if (aggregatePromise == null) { + throw new NullPointerException("aggregatePromise"); + } + this.aggregatePromise = aggregatePromise; + this.failPending = failPending; + } + + /** + * See {@link PromiseAggregator#PromiseAggregator(Promise, boolean)}. + * Defaults {@code failPending} to true. + */ + public PromiseAggregator(Promise aggregatePromise) { + this(aggregatePromise, true); + } + + /** + * Add the given {@link Promise}s to the aggregator. + */ + public final PromiseAggregator add(Promise... promises) { + if (promises == null) { + throw new NullPointerException("promises"); + } + if (promises.length == 0) { + return this; + } + synchronized (this) { + if (pendingPromises == null) { + int size; + if (promises.length > 1) { + size = promises.length; + } else { + size = 2; + } + pendingPromises = new LinkedHashSet>(size); + } + for (Promise p : promises) { + if (p == null) { + continue; + } + pendingPromises.add(p); + p.addListener(this); + } + } + return this; + } + + @Override + public synchronized void operationComplete(F future) throws Exception { + if (pendingPromises == null) { + aggregatePromise.setSuccess(null); + } else { + pendingPromises.remove(future); + if (!future.isSuccess()) { + Throwable cause = future.cause(); + aggregatePromise.setFailure(cause); + if (failPending) { + for (Promise pendingFuture : pendingPromises) { + pendingFuture.setFailure(cause); + } + } + } else { + if (pendingPromises.isEmpty()) { + aggregatePromise.setSuccess(null); + } + } + } + } + +} diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java b/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java new file mode 100644 index 0000000000..0a2e3482fd --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java @@ -0,0 +1,62 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +/** + * {@link GenericFutureListener} implementation which takes other {@link Future}s + * and notifies them on completion. + * + * @param V the type of value returned by the future + * @param F the type of future + */ +public class PromiseNotifier> implements GenericFutureListener { + + private final Promise[] promises; + + /** + * Create a new instance. + * + * @param promises the {@link Promise}s to notify once this {@link GenericFutureListener} is notified. + */ + public PromiseNotifier(Promise... promises) { + if (promises == null) { + throw new NullPointerException("promises"); + } + for (Promise promise: promises) { + if (promise == null) { + throw new IllegalArgumentException("promises contains null Promise"); + } + } + this.promises = promises.clone(); + } + + @Override + public void operationComplete(F future) throws Exception { + if (future.isSuccess()) { + V result = future.get(); + for (Promise p: promises) { + p.setSuccess(result); + } + return; + } + + Throwable cause = future.cause(); + for (Promise p: promises) { + p.setFailure(cause); + } + } + +} diff --git a/common/src/test/java/io/netty/util/concurrent/PromiseAggregatorTest.java b/common/src/test/java/io/netty/util/concurrent/PromiseAggregatorTest.java new file mode 100644 index 0000000000..a081124625 --- /dev/null +++ b/common/src/test/java/io/netty/util/concurrent/PromiseAggregatorTest.java @@ -0,0 +1,132 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.concurrent; + +import static org.easymock.EasyMock.*; +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class PromiseAggregatorTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testNullAggregatePromise() { + expectedException.expect(NullPointerException.class); + new PromiseAggregator>(null); + } + + @Test + public void testAddNullFuture() { + @SuppressWarnings("unchecked") + Promise p = createStrictMock(Promise.class); + PromiseAggregator> a = + new PromiseAggregator>(p); + expectedException.expect(NullPointerException.class); + a.add((Promise[]) null); + } + + @SuppressWarnings("unchecked") + @Test + public void testSucessfulNoPending() throws Exception { + Promise p = createStrictMock(Promise.class); + PromiseAggregator> a = + new PromiseAggregator>(p); + + Future future = createStrictMock(Future.class); + expect(p.setSuccess(null)).andReturn(p); + replay(future, p); + + a.add(); + a.operationComplete(future); + verify(future, p); + } + + @SuppressWarnings("unchecked") + @Test + public void testSuccessfulPending() throws Exception { + Promise p = createStrictMock(Promise.class); + PromiseAggregator> a = + new PromiseAggregator>(p); + Promise p1 = createStrictMock(Promise.class); + Promise p2 = createStrictMock(Promise.class); + + expect(p1.addListener(a)).andReturn(p1); + expect(p2.addListener(a)).andReturn(p2); + expect(p1.isSuccess()).andReturn(true); + expect(p2.isSuccess()).andReturn(true); + expect(p.setSuccess(null)).andReturn(p); + replay(p1, p2, p); + + assertThat(a.add(p1, null, p2), is(a)); + a.operationComplete(p1); + a.operationComplete(p2); + + verify(p1, p2, p); + } + + @SuppressWarnings("unchecked") + @Test + public void testFailedFutureFailPending() throws Exception { + Promise p = createStrictMock(Promise.class); + PromiseAggregator> a = + new PromiseAggregator>(p); + Promise p1 = createStrictMock(Promise.class); + Promise p2 = createStrictMock(Promise.class); + Throwable t = createStrictMock(Throwable.class); + + expect(p1.addListener(a)).andReturn(p1); + expect(p2.addListener(a)).andReturn(p2); + expect(p1.isSuccess()).andReturn(false); + expect(p1.cause()).andReturn(t); + expect(p.setFailure(t)).andReturn(p); + expect(p2.setFailure(t)).andReturn(p2); + replay(p1, p2, p); + + a.add(p1, p2); + a.operationComplete(p1); + verify(p1, p2, p); + } + + @SuppressWarnings("unchecked") + @Test + public void testFailedFutureNoFailPending() throws Exception { + Promise p = createStrictMock(Promise.class); + PromiseAggregator> a = + new PromiseAggregator>(p, false); + Promise p1 = createStrictMock(Promise.class); + Promise p2 = createStrictMock(Promise.class); + Throwable t = createStrictMock(Throwable.class); + + expect(p1.addListener(a)).andReturn(p1); + expect(p2.addListener(a)).andReturn(p2); + expect(p1.isSuccess()).andReturn(false); + expect(p1.cause()).andReturn(t); + expect(p.setFailure(t)).andReturn(p); + replay(p1, p2, p); + + a.add(p1, p2); + a.operationComplete(p1); + verify(p1, p2, p); + } + +} diff --git a/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java b/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java new file mode 100644 index 0000000000..e925f957bb --- /dev/null +++ b/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.concurrent; + +import static org.easymock.EasyMock.*; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class PromiseNotifierTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testNullPromisesArray() { + expectedException.expect(NullPointerException.class); + new PromiseNotifier>((Promise[]) null); + } + + @SuppressWarnings("unchecked") + @Test + public void testNullPromiseInArray() { + expectedException.expect(IllegalArgumentException.class); + new PromiseNotifier>((Promise) null); + } + + @Test + public void testListenerSuccess() throws Exception { + @SuppressWarnings("unchecked") + Promise p1 = createStrictMock(Promise.class); + @SuppressWarnings("unchecked") + Promise p2 = createStrictMock(Promise.class); + + @SuppressWarnings("unchecked") + PromiseNotifier> notifier = + new PromiseNotifier>(p1, p2); + + @SuppressWarnings("unchecked") + Future future = createStrictMock(Future.class); + expect(future.isSuccess()).andReturn(true); + expect(future.get()).andReturn(null); + expect(p1.setSuccess(null)).andReturn(p1); + expect(p2.setSuccess(null)).andReturn(p2); + replay(p1, p2, future); + + notifier.operationComplete(future); + verify(p1, p2); + } + + @Test + public void testListenerFailure() throws Exception { + @SuppressWarnings("unchecked") + Promise p1 = createStrictMock(Promise.class); + @SuppressWarnings("unchecked") + Promise p2 = createStrictMock(Promise.class); + + @SuppressWarnings("unchecked") + PromiseNotifier> notifier = + new PromiseNotifier>(p1, p2); + + @SuppressWarnings("unchecked") + Future future = createStrictMock(Future.class); + Throwable t = createStrictMock(Throwable.class); + expect(future.isSuccess()).andReturn(false); + expect(future.cause()).andReturn(t); + expect(p1.setFailure(t)).andReturn(p1); + expect(p2.setFailure(t)).andReturn(p2); + replay(p1, p2, future); + + notifier.operationComplete(future); + verify(p1, p2); + } + +} diff --git a/transport/src/main/java/io/netty/channel/ChannelPromiseAggregator.java b/transport/src/main/java/io/netty/channel/ChannelPromiseAggregator.java index 1ade63fe03..b882e7dbee 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPromiseAggregator.java +++ b/transport/src/main/java/io/netty/channel/ChannelPromiseAggregator.java @@ -16,79 +16,19 @@ package io.netty.channel; -import java.util.LinkedHashSet; -import java.util.Set; - +import io.netty.util.concurrent.PromiseAggregator; /** * Class which is used to consolidate multiple channel futures into one, by * listening to the individual futures and producing an aggregated result * (success/failure) when all futures have completed. */ -public final class ChannelPromiseAggregator implements ChannelFutureListener { +public final class ChannelPromiseAggregator + extends PromiseAggregator + implements ChannelFutureListener { - private final ChannelPromise aggregatePromise; - private Set pendingPromises; - - /** - * Instance an new {@link ChannelPromiseAggregator} - * - * @param aggregatePromise the {@link ChannelPromise} to notify - */ public ChannelPromiseAggregator(ChannelPromise aggregatePromise) { - if (aggregatePromise == null) { - throw new NullPointerException("aggregatePromise"); - } - this.aggregatePromise = aggregatePromise; + super(aggregatePromise); } - /** - * Add the given {@link ChannelPromise}s to the aggregator. - */ - public ChannelPromiseAggregator add(ChannelPromise... promises) { - if (promises == null) { - throw new NullPointerException("promises"); - } - if (promises.length == 0) { - return this; - } - synchronized (this) { - if (pendingPromises == null) { - int size; - if (promises.length > 1) { - size = promises.length; - } else { - size = 2; - } - pendingPromises = new LinkedHashSet(size); - } - for (ChannelPromise p: promises) { - if (p == null) { - continue; - } - pendingPromises.add(p); - p.addListener(this); - } - } - return this; - } - - @Override - public synchronized void operationComplete(ChannelFuture future) throws Exception { - if (pendingPromises == null) { - aggregatePromise.setSuccess(); - } else { - pendingPromises.remove(future); - if (!future.isSuccess()) { - aggregatePromise.setFailure(future.cause()); - for (ChannelPromise pendingFuture : pendingPromises) { - pendingFuture.setFailure(future.cause()); - } - } else { - if (pendingPromises.isEmpty()) { - aggregatePromise.setSuccess(); - } - } - } - } } diff --git a/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java b/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java index c3e40698eb..af2b890ced 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java +++ b/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java @@ -15,12 +15,14 @@ */ package io.netty.channel; +import io.netty.util.concurrent.PromiseNotifier; + /** * ChannelFutureListener implementation which takes other {@link ChannelFuture}(s) and notifies them on completion. */ -public final class ChannelPromiseNotifier implements ChannelFutureListener { - - private final ChannelPromise[] promises; +public final class ChannelPromiseNotifier + extends PromiseNotifier + implements ChannelFutureListener { /** * Create a new instance @@ -28,29 +30,7 @@ public final class ChannelPromiseNotifier implements ChannelFutureListener { * @param promises the {@link ChannelPromise}s to notify once this {@link ChannelFutureListener} is notified. */ public ChannelPromiseNotifier(ChannelPromise... promises) { - if (promises == null) { - throw new NullPointerException("promises"); - } - for (ChannelPromise promise: promises) { - if (promise == null) { - throw new IllegalArgumentException("promises contains null ChannelPromise"); - } - } - this.promises = promises.clone(); + super(promises); } - @Override - public void operationComplete(ChannelFuture cf) throws Exception { - if (cf.isSuccess()) { - for (ChannelPromise p: promises) { - p.setSuccess(); - } - return; - } - - Throwable cause = cf.cause(); - for (ChannelPromise p: promises) { - p.setFailure(cause); - } - } }