Add generic versions of PromiseAggregator and PromiseNotifier.

Motivation:

ChannelPromiseAggregator and ChannelPromiseNotifiers only allow
consumers to work with Channels as the result type. Generic versions
of these classes allow consumers to aggregate or broadcast the results
of an asynchronous execution with other result types.

Modifications:

Add PromiseAggregator and PromiseNotifier. Add unit tests for both.
Remove code in ChannelPromiseAggregator and ChannelPromiseNotifier and
modify them to extend the new base classes.

Result:

Consumers can now aggregate or broadcast the results of an asynchronous
execution with results types other than Channel.
This commit is contained in:
Sam Young 2014-09-18 22:20:23 -07:00 committed by Norman Maurer
parent 7da5ca3629
commit bb94f05083
6 changed files with 406 additions and 91 deletions

View File

@ -0,0 +1,111 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.LinkedHashSet;
import java.util.Set;
/**
* {@link GenericFutureListener} implementation which consolidates multiple {@link Future}s
* into one, by listening to individual {@link Future}s and producing an aggregated result
* (success/failure) when all {@link Future}s have completed.
*
* @param V the type of value returned by the {@link Future}
* @param F the type of {@link Future}
*/
public class PromiseAggregator<V, F extends Future<V>> implements GenericFutureListener<F> {
private final Promise<?> aggregatePromise;
private final boolean failPending;
private Set<Promise<V>> pendingPromises;
/**
* Creates a new instance.
*
* @param aggregatePromise the {@link Promise} to notify
* @param failPending {@code true} to fail pending promises, false to leave them unaffected
*/
public PromiseAggregator(Promise<Void> aggregatePromise, boolean failPending) {
if (aggregatePromise == null) {
throw new NullPointerException("aggregatePromise");
}
this.aggregatePromise = aggregatePromise;
this.failPending = failPending;
}
/**
* See {@link PromiseAggregator#PromiseAggregator(Promise, boolean)}.
* Defaults {@code failPending} to true.
*/
public PromiseAggregator(Promise<Void> aggregatePromise) {
this(aggregatePromise, true);
}
/**
* Add the given {@link Promise}s to the aggregator.
*/
public final PromiseAggregator<V, F> add(Promise<V>... promises) {
if (promises == null) {
throw new NullPointerException("promises");
}
if (promises.length == 0) {
return this;
}
synchronized (this) {
if (pendingPromises == null) {
int size;
if (promises.length > 1) {
size = promises.length;
} else {
size = 2;
}
pendingPromises = new LinkedHashSet<Promise<V>>(size);
}
for (Promise<V> p : promises) {
if (p == null) {
continue;
}
pendingPromises.add(p);
p.addListener(this);
}
}
return this;
}
@Override
public synchronized void operationComplete(F future) throws Exception {
if (pendingPromises == null) {
aggregatePromise.setSuccess(null);
} else {
pendingPromises.remove(future);
if (!future.isSuccess()) {
Throwable cause = future.cause();
aggregatePromise.setFailure(cause);
if (failPending) {
for (Promise<V> pendingFuture : pendingPromises) {
pendingFuture.setFailure(cause);
}
}
} else {
if (pendingPromises.isEmpty()) {
aggregatePromise.setSuccess(null);
}
}
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
/**
* {@link GenericFutureListener} implementation which takes other {@link Future}s
* and notifies them on completion.
*
* @param V the type of value returned by the future
* @param F the type of future
*/
public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureListener<F> {
private final Promise<? super V>[] promises;
/**
* Create a new instance.
*
* @param promises the {@link Promise}s to notify once this {@link GenericFutureListener} is notified.
*/
public PromiseNotifier(Promise<? super V>... promises) {
if (promises == null) {
throw new NullPointerException("promises");
}
for (Promise<? super V> promise: promises) {
if (promise == null) {
throw new IllegalArgumentException("promises contains null Promise");
}
}
this.promises = promises.clone();
}
@Override
public void operationComplete(F future) throws Exception {
if (future.isSuccess()) {
V result = future.get();
for (Promise<? super V> p: promises) {
p.setSuccess(result);
}
return;
}
Throwable cause = future.cause();
for (Promise<? super V> p: promises) {
p.setFailure(cause);
}
}
}

View File

@ -0,0 +1,132 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import static org.easymock.EasyMock.*;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class PromiseAggregatorTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testNullAggregatePromise() {
expectedException.expect(NullPointerException.class);
new PromiseAggregator<Void, Future<Void>>(null);
}
@Test
public void testAddNullFuture() {
@SuppressWarnings("unchecked")
Promise<Void> p = createStrictMock(Promise.class);
PromiseAggregator<Void, Future<Void>> a =
new PromiseAggregator<Void, Future<Void>>(p);
expectedException.expect(NullPointerException.class);
a.add((Promise<Void>[]) null);
}
@SuppressWarnings("unchecked")
@Test
public void testSucessfulNoPending() throws Exception {
Promise<Void> p = createStrictMock(Promise.class);
PromiseAggregator<Void, Future<Void>> a =
new PromiseAggregator<Void, Future<Void>>(p);
Future<Void> future = createStrictMock(Future.class);
expect(p.setSuccess(null)).andReturn(p);
replay(future, p);
a.add();
a.operationComplete(future);
verify(future, p);
}
@SuppressWarnings("unchecked")
@Test
public void testSuccessfulPending() throws Exception {
Promise<Void> p = createStrictMock(Promise.class);
PromiseAggregator<Void, Future<Void>> a =
new PromiseAggregator<Void, Future<Void>>(p);
Promise<Void> p1 = createStrictMock(Promise.class);
Promise<Void> p2 = createStrictMock(Promise.class);
expect(p1.addListener(a)).andReturn(p1);
expect(p2.addListener(a)).andReturn(p2);
expect(p1.isSuccess()).andReturn(true);
expect(p2.isSuccess()).andReturn(true);
expect(p.setSuccess(null)).andReturn(p);
replay(p1, p2, p);
assertThat(a.add(p1, null, p2), is(a));
a.operationComplete(p1);
a.operationComplete(p2);
verify(p1, p2, p);
}
@SuppressWarnings("unchecked")
@Test
public void testFailedFutureFailPending() throws Exception {
Promise<Void> p = createStrictMock(Promise.class);
PromiseAggregator<Void, Future<Void>> a =
new PromiseAggregator<Void, Future<Void>>(p);
Promise<Void> p1 = createStrictMock(Promise.class);
Promise<Void> p2 = createStrictMock(Promise.class);
Throwable t = createStrictMock(Throwable.class);
expect(p1.addListener(a)).andReturn(p1);
expect(p2.addListener(a)).andReturn(p2);
expect(p1.isSuccess()).andReturn(false);
expect(p1.cause()).andReturn(t);
expect(p.setFailure(t)).andReturn(p);
expect(p2.setFailure(t)).andReturn(p2);
replay(p1, p2, p);
a.add(p1, p2);
a.operationComplete(p1);
verify(p1, p2, p);
}
@SuppressWarnings("unchecked")
@Test
public void testFailedFutureNoFailPending() throws Exception {
Promise<Void> p = createStrictMock(Promise.class);
PromiseAggregator<Void, Future<Void>> a =
new PromiseAggregator<Void, Future<Void>>(p, false);
Promise<Void> p1 = createStrictMock(Promise.class);
Promise<Void> p2 = createStrictMock(Promise.class);
Throwable t = createStrictMock(Throwable.class);
expect(p1.addListener(a)).andReturn(p1);
expect(p2.addListener(a)).andReturn(p2);
expect(p1.isSuccess()).andReturn(false);
expect(p1.cause()).andReturn(t);
expect(p.setFailure(t)).andReturn(p);
replay(p1, p2, p);
a.add(p1, p2);
a.operationComplete(p1);
verify(p1, p2, p);
}
}

View File

@ -0,0 +1,90 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import static org.easymock.EasyMock.*;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class PromiseNotifierTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testNullPromisesArray() {
expectedException.expect(NullPointerException.class);
new PromiseNotifier<Void, Future<Void>>((Promise<Void>[]) null);
}
@SuppressWarnings("unchecked")
@Test
public void testNullPromiseInArray() {
expectedException.expect(IllegalArgumentException.class);
new PromiseNotifier<Void, Future<Void>>((Promise<Void>) null);
}
@Test
public void testListenerSuccess() throws Exception {
@SuppressWarnings("unchecked")
Promise<Void> p1 = createStrictMock(Promise.class);
@SuppressWarnings("unchecked")
Promise<Void> p2 = createStrictMock(Promise.class);
@SuppressWarnings("unchecked")
PromiseNotifier<Void, Future<Void>> notifier =
new PromiseNotifier<Void, Future<Void>>(p1, p2);
@SuppressWarnings("unchecked")
Future<Void> future = createStrictMock(Future.class);
expect(future.isSuccess()).andReturn(true);
expect(future.get()).andReturn(null);
expect(p1.setSuccess(null)).andReturn(p1);
expect(p2.setSuccess(null)).andReturn(p2);
replay(p1, p2, future);
notifier.operationComplete(future);
verify(p1, p2);
}
@Test
public void testListenerFailure() throws Exception {
@SuppressWarnings("unchecked")
Promise<Void> p1 = createStrictMock(Promise.class);
@SuppressWarnings("unchecked")
Promise<Void> p2 = createStrictMock(Promise.class);
@SuppressWarnings("unchecked")
PromiseNotifier<Void, Future<Void>> notifier =
new PromiseNotifier<Void, Future<Void>>(p1, p2);
@SuppressWarnings("unchecked")
Future<Void> future = createStrictMock(Future.class);
Throwable t = createStrictMock(Throwable.class);
expect(future.isSuccess()).andReturn(false);
expect(future.cause()).andReturn(t);
expect(p1.setFailure(t)).andReturn(p1);
expect(p2.setFailure(t)).andReturn(p2);
replay(p1, p2, future);
notifier.operationComplete(future);
verify(p1, p2);
}
}

View File

@ -16,79 +16,19 @@
package io.netty.channel;
import java.util.LinkedHashSet;
import java.util.Set;
import io.netty.util.concurrent.PromiseAggregator;
/**
* Class which is used to consolidate multiple channel futures into one, by
* listening to the individual futures and producing an aggregated result
* (success/failure) when all futures have completed.
*/
public final class ChannelPromiseAggregator implements ChannelFutureListener {
public final class ChannelPromiseAggregator
extends PromiseAggregator<Void, ChannelFuture>
implements ChannelFutureListener {
private final ChannelPromise aggregatePromise;
private Set<ChannelPromise> pendingPromises;
/**
* Instance an new {@link ChannelPromiseAggregator}
*
* @param aggregatePromise the {@link ChannelPromise} to notify
*/
public ChannelPromiseAggregator(ChannelPromise aggregatePromise) {
if (aggregatePromise == null) {
throw new NullPointerException("aggregatePromise");
}
this.aggregatePromise = aggregatePromise;
super(aggregatePromise);
}
/**
* Add the given {@link ChannelPromise}s to the aggregator.
*/
public ChannelPromiseAggregator add(ChannelPromise... promises) {
if (promises == null) {
throw new NullPointerException("promises");
}
if (promises.length == 0) {
return this;
}
synchronized (this) {
if (pendingPromises == null) {
int size;
if (promises.length > 1) {
size = promises.length;
} else {
size = 2;
}
pendingPromises = new LinkedHashSet<ChannelPromise>(size);
}
for (ChannelPromise p: promises) {
if (p == null) {
continue;
}
pendingPromises.add(p);
p.addListener(this);
}
}
return this;
}
@Override
public synchronized void operationComplete(ChannelFuture future) throws Exception {
if (pendingPromises == null) {
aggregatePromise.setSuccess();
} else {
pendingPromises.remove(future);
if (!future.isSuccess()) {
aggregatePromise.setFailure(future.cause());
for (ChannelPromise pendingFuture : pendingPromises) {
pendingFuture.setFailure(future.cause());
}
} else {
if (pendingPromises.isEmpty()) {
aggregatePromise.setSuccess();
}
}
}
}
}

View File

@ -15,12 +15,14 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.PromiseNotifier;
/**
* ChannelFutureListener implementation which takes other {@link ChannelFuture}(s) and notifies them on completion.
*/
public final class ChannelPromiseNotifier implements ChannelFutureListener {
private final ChannelPromise[] promises;
public final class ChannelPromiseNotifier
extends PromiseNotifier<Void, ChannelFuture>
implements ChannelFutureListener {
/**
* Create a new instance
@ -28,29 +30,7 @@ public final class ChannelPromiseNotifier implements ChannelFutureListener {
* @param promises the {@link ChannelPromise}s to notify once this {@link ChannelFutureListener} is notified.
*/
public ChannelPromiseNotifier(ChannelPromise... promises) {
if (promises == null) {
throw new NullPointerException("promises");
}
for (ChannelPromise promise: promises) {
if (promise == null) {
throw new IllegalArgumentException("promises contains null ChannelPromise");
}
}
this.promises = promises.clone();
super(promises);
}
@Override
public void operationComplete(ChannelFuture cf) throws Exception {
if (cf.isSuccess()) {
for (ChannelPromise p: promises) {
p.setSuccess();
}
return;
}
Throwable cause = cf.cause();
for (ChannelPromise p: promises) {
p.setFailure(cause);
}
}
}