From 8fbc513da7733af52a6c2d8b51932f844a52849f Mon Sep 17 00:00:00 2001 From: Sam Young Date: Thu, 18 Sep 2014 22:20:23 -0700 Subject: [PATCH] Add generic versions of PromiseAggregator and PromiseNotifier. Motivation: ChannelPromiseAggregator and ChannelPromiseNotifiers only allow consumers to work with Channels as the result type. Generic versions of these classes allow consumers to aggregate or broadcast the results of an asynchronous execution with other result types. Modifications: Add PromiseAggregator and PromiseNotifier. Add unit tests for both. Remove code in ChannelPromiseAggregator and ChannelPromiseNotifier and modify them to extend the new base classes. Result: Consumers can now aggregate or broadcast the results of an asynchronous execution with results types other than Channel. --- .../util/concurrent/PromiseAggregator.java | 111 +++++++++++++++ .../util/concurrent/PromiseNotifier.java | 62 ++++++++ .../concurrent/PromiseAggregatorTest.java | 132 ++++++++++++++++++ .../util/concurrent/PromiseNotifierTest.java | 90 ++++++++++++ .../channel/ChannelPromiseAggregator.java | 70 +--------- .../netty/channel/ChannelPromiseNotifier.java | 32 +---- 6 files changed, 406 insertions(+), 91 deletions(-) create mode 100644 common/src/main/java/io/netty/util/concurrent/PromiseAggregator.java create mode 100644 common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java create mode 100644 common/src/test/java/io/netty/util/concurrent/PromiseAggregatorTest.java create mode 100644 common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseAggregator.java b/common/src/main/java/io/netty/util/concurrent/PromiseAggregator.java new file mode 100644 index 0000000000..62fb29e967 --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/PromiseAggregator.java @@ -0,0 +1,111 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.concurrent; + +import java.util.LinkedHashSet; +import java.util.Set; + +/** + * {@link GenericFutureListener} implementation which consolidates multiple {@link Future}s + * into one, by listening to individual {@link Future}s and producing an aggregated result + * (success/failure) when all {@link Future}s have completed. + * + * @param V the type of value returned by the {@link Future} + * @param F the type of {@link Future} + */ +public class PromiseAggregator> implements GenericFutureListener { + + private final Promise aggregatePromise; + private final boolean failPending; + private Set> pendingPromises; + + /** + * Creates a new instance. + * + * @param aggregatePromise the {@link Promise} to notify + * @param failPending {@code true} to fail pending promises, false to leave them unaffected + */ + public PromiseAggregator(Promise aggregatePromise, boolean failPending) { + if (aggregatePromise == null) { + throw new NullPointerException("aggregatePromise"); + } + this.aggregatePromise = aggregatePromise; + this.failPending = failPending; + } + + /** + * See {@link PromiseAggregator#PromiseAggregator(Promise, boolean)}. + * Defaults {@code failPending} to true. + */ + public PromiseAggregator(Promise aggregatePromise) { + this(aggregatePromise, true); + } + + /** + * Add the given {@link Promise}s to the aggregator. + */ + public final PromiseAggregator add(Promise... promises) { + if (promises == null) { + throw new NullPointerException("promises"); + } + if (promises.length == 0) { + return this; + } + synchronized (this) { + if (pendingPromises == null) { + int size; + if (promises.length > 1) { + size = promises.length; + } else { + size = 2; + } + pendingPromises = new LinkedHashSet>(size); + } + for (Promise p : promises) { + if (p == null) { + continue; + } + pendingPromises.add(p); + p.addListener(this); + } + } + return this; + } + + @Override + public synchronized void operationComplete(F future) throws Exception { + if (pendingPromises == null) { + aggregatePromise.setSuccess(null); + } else { + pendingPromises.remove(future); + if (!future.isSuccess()) { + Throwable cause = future.cause(); + aggregatePromise.setFailure(cause); + if (failPending) { + for (Promise pendingFuture : pendingPromises) { + pendingFuture.setFailure(cause); + } + } + } else { + if (pendingPromises.isEmpty()) { + aggregatePromise.setSuccess(null); + } + } + } + } + +} diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java b/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java new file mode 100644 index 0000000000..0a2e3482fd --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java @@ -0,0 +1,62 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +/** + * {@link GenericFutureListener} implementation which takes other {@link Future}s + * and notifies them on completion. + * + * @param V the type of value returned by the future + * @param F the type of future + */ +public class PromiseNotifier> implements GenericFutureListener { + + private final Promise[] promises; + + /** + * Create a new instance. + * + * @param promises the {@link Promise}s to notify once this {@link GenericFutureListener} is notified. + */ + public PromiseNotifier(Promise... promises) { + if (promises == null) { + throw new NullPointerException("promises"); + } + for (Promise promise: promises) { + if (promise == null) { + throw new IllegalArgumentException("promises contains null Promise"); + } + } + this.promises = promises.clone(); + } + + @Override + public void operationComplete(F future) throws Exception { + if (future.isSuccess()) { + V result = future.get(); + for (Promise p: promises) { + p.setSuccess(result); + } + return; + } + + Throwable cause = future.cause(); + for (Promise p: promises) { + p.setFailure(cause); + } + } + +} diff --git a/common/src/test/java/io/netty/util/concurrent/PromiseAggregatorTest.java b/common/src/test/java/io/netty/util/concurrent/PromiseAggregatorTest.java new file mode 100644 index 0000000000..a081124625 --- /dev/null +++ b/common/src/test/java/io/netty/util/concurrent/PromiseAggregatorTest.java @@ -0,0 +1,132 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.concurrent; + +import static org.easymock.EasyMock.*; +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class PromiseAggregatorTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testNullAggregatePromise() { + expectedException.expect(NullPointerException.class); + new PromiseAggregator>(null); + } + + @Test + public void testAddNullFuture() { + @SuppressWarnings("unchecked") + Promise p = createStrictMock(Promise.class); + PromiseAggregator> a = + new PromiseAggregator>(p); + expectedException.expect(NullPointerException.class); + a.add((Promise[]) null); + } + + @SuppressWarnings("unchecked") + @Test + public void testSucessfulNoPending() throws Exception { + Promise p = createStrictMock(Promise.class); + PromiseAggregator> a = + new PromiseAggregator>(p); + + Future future = createStrictMock(Future.class); + expect(p.setSuccess(null)).andReturn(p); + replay(future, p); + + a.add(); + a.operationComplete(future); + verify(future, p); + } + + @SuppressWarnings("unchecked") + @Test + public void testSuccessfulPending() throws Exception { + Promise p = createStrictMock(Promise.class); + PromiseAggregator> a = + new PromiseAggregator>(p); + Promise p1 = createStrictMock(Promise.class); + Promise p2 = createStrictMock(Promise.class); + + expect(p1.addListener(a)).andReturn(p1); + expect(p2.addListener(a)).andReturn(p2); + expect(p1.isSuccess()).andReturn(true); + expect(p2.isSuccess()).andReturn(true); + expect(p.setSuccess(null)).andReturn(p); + replay(p1, p2, p); + + assertThat(a.add(p1, null, p2), is(a)); + a.operationComplete(p1); + a.operationComplete(p2); + + verify(p1, p2, p); + } + + @SuppressWarnings("unchecked") + @Test + public void testFailedFutureFailPending() throws Exception { + Promise p = createStrictMock(Promise.class); + PromiseAggregator> a = + new PromiseAggregator>(p); + Promise p1 = createStrictMock(Promise.class); + Promise p2 = createStrictMock(Promise.class); + Throwable t = createStrictMock(Throwable.class); + + expect(p1.addListener(a)).andReturn(p1); + expect(p2.addListener(a)).andReturn(p2); + expect(p1.isSuccess()).andReturn(false); + expect(p1.cause()).andReturn(t); + expect(p.setFailure(t)).andReturn(p); + expect(p2.setFailure(t)).andReturn(p2); + replay(p1, p2, p); + + a.add(p1, p2); + a.operationComplete(p1); + verify(p1, p2, p); + } + + @SuppressWarnings("unchecked") + @Test + public void testFailedFutureNoFailPending() throws Exception { + Promise p = createStrictMock(Promise.class); + PromiseAggregator> a = + new PromiseAggregator>(p, false); + Promise p1 = createStrictMock(Promise.class); + Promise p2 = createStrictMock(Promise.class); + Throwable t = createStrictMock(Throwable.class); + + expect(p1.addListener(a)).andReturn(p1); + expect(p2.addListener(a)).andReturn(p2); + expect(p1.isSuccess()).andReturn(false); + expect(p1.cause()).andReturn(t); + expect(p.setFailure(t)).andReturn(p); + replay(p1, p2, p); + + a.add(p1, p2); + a.operationComplete(p1); + verify(p1, p2, p); + } + +} diff --git a/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java b/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java new file mode 100644 index 0000000000..e925f957bb --- /dev/null +++ b/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.concurrent; + +import static org.easymock.EasyMock.*; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class PromiseNotifierTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testNullPromisesArray() { + expectedException.expect(NullPointerException.class); + new PromiseNotifier>((Promise[]) null); + } + + @SuppressWarnings("unchecked") + @Test + public void testNullPromiseInArray() { + expectedException.expect(IllegalArgumentException.class); + new PromiseNotifier>((Promise) null); + } + + @Test + public void testListenerSuccess() throws Exception { + @SuppressWarnings("unchecked") + Promise p1 = createStrictMock(Promise.class); + @SuppressWarnings("unchecked") + Promise p2 = createStrictMock(Promise.class); + + @SuppressWarnings("unchecked") + PromiseNotifier> notifier = + new PromiseNotifier>(p1, p2); + + @SuppressWarnings("unchecked") + Future future = createStrictMock(Future.class); + expect(future.isSuccess()).andReturn(true); + expect(future.get()).andReturn(null); + expect(p1.setSuccess(null)).andReturn(p1); + expect(p2.setSuccess(null)).andReturn(p2); + replay(p1, p2, future); + + notifier.operationComplete(future); + verify(p1, p2); + } + + @Test + public void testListenerFailure() throws Exception { + @SuppressWarnings("unchecked") + Promise p1 = createStrictMock(Promise.class); + @SuppressWarnings("unchecked") + Promise p2 = createStrictMock(Promise.class); + + @SuppressWarnings("unchecked") + PromiseNotifier> notifier = + new PromiseNotifier>(p1, p2); + + @SuppressWarnings("unchecked") + Future future = createStrictMock(Future.class); + Throwable t = createStrictMock(Throwable.class); + expect(future.isSuccess()).andReturn(false); + expect(future.cause()).andReturn(t); + expect(p1.setFailure(t)).andReturn(p1); + expect(p2.setFailure(t)).andReturn(p2); + replay(p1, p2, future); + + notifier.operationComplete(future); + verify(p1, p2); + } + +} diff --git a/transport/src/main/java/io/netty/channel/ChannelPromiseAggregator.java b/transport/src/main/java/io/netty/channel/ChannelPromiseAggregator.java index 1ade63fe03..b882e7dbee 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPromiseAggregator.java +++ b/transport/src/main/java/io/netty/channel/ChannelPromiseAggregator.java @@ -16,79 +16,19 @@ package io.netty.channel; -import java.util.LinkedHashSet; -import java.util.Set; - +import io.netty.util.concurrent.PromiseAggregator; /** * Class which is used to consolidate multiple channel futures into one, by * listening to the individual futures and producing an aggregated result * (success/failure) when all futures have completed. */ -public final class ChannelPromiseAggregator implements ChannelFutureListener { +public final class ChannelPromiseAggregator + extends PromiseAggregator + implements ChannelFutureListener { - private final ChannelPromise aggregatePromise; - private Set pendingPromises; - - /** - * Instance an new {@link ChannelPromiseAggregator} - * - * @param aggregatePromise the {@link ChannelPromise} to notify - */ public ChannelPromiseAggregator(ChannelPromise aggregatePromise) { - if (aggregatePromise == null) { - throw new NullPointerException("aggregatePromise"); - } - this.aggregatePromise = aggregatePromise; + super(aggregatePromise); } - /** - * Add the given {@link ChannelPromise}s to the aggregator. - */ - public ChannelPromiseAggregator add(ChannelPromise... promises) { - if (promises == null) { - throw new NullPointerException("promises"); - } - if (promises.length == 0) { - return this; - } - synchronized (this) { - if (pendingPromises == null) { - int size; - if (promises.length > 1) { - size = promises.length; - } else { - size = 2; - } - pendingPromises = new LinkedHashSet(size); - } - for (ChannelPromise p: promises) { - if (p == null) { - continue; - } - pendingPromises.add(p); - p.addListener(this); - } - } - return this; - } - - @Override - public synchronized void operationComplete(ChannelFuture future) throws Exception { - if (pendingPromises == null) { - aggregatePromise.setSuccess(); - } else { - pendingPromises.remove(future); - if (!future.isSuccess()) { - aggregatePromise.setFailure(future.cause()); - for (ChannelPromise pendingFuture : pendingPromises) { - pendingFuture.setFailure(future.cause()); - } - } else { - if (pendingPromises.isEmpty()) { - aggregatePromise.setSuccess(); - } - } - } - } } diff --git a/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java b/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java index c3e40698eb..af2b890ced 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java +++ b/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java @@ -15,12 +15,14 @@ */ package io.netty.channel; +import io.netty.util.concurrent.PromiseNotifier; + /** * ChannelFutureListener implementation which takes other {@link ChannelFuture}(s) and notifies them on completion. */ -public final class ChannelPromiseNotifier implements ChannelFutureListener { - - private final ChannelPromise[] promises; +public final class ChannelPromiseNotifier + extends PromiseNotifier + implements ChannelFutureListener { /** * Create a new instance @@ -28,29 +30,7 @@ public final class ChannelPromiseNotifier implements ChannelFutureListener { * @param promises the {@link ChannelPromise}s to notify once this {@link ChannelFutureListener} is notified. */ public ChannelPromiseNotifier(ChannelPromise... promises) { - if (promises == null) { - throw new NullPointerException("promises"); - } - for (ChannelPromise promise: promises) { - if (promise == null) { - throw new IllegalArgumentException("promises contains null ChannelPromise"); - } - } - this.promises = promises.clone(); + super(promises); } - @Override - public void operationComplete(ChannelFuture cf) throws Exception { - if (cf.isSuccess()) { - for (ChannelPromise p: promises) { - p.setSuccess(); - } - return; - } - - Throwable cause = cf.cause(); - for (ChannelPromise p: promises) { - p.setFailure(cause); - } - } }