Add generic versions of PromiseAggregator and PromiseNotifier.
Motivation: ChannelPromiseAggregator and ChannelPromiseNotifiers only allow consumers to work with Channels as the result type. Generic versions of these classes allow consumers to aggregate or broadcast the results of an asynchronous execution with other result types. Modifications: Add PromiseAggregator and PromiseNotifier. Add unit tests for both. Remove code in ChannelPromiseAggregator and ChannelPromiseNotifier and modify them to extend the new base classes. Result: Consumers can now aggregate or broadcast the results of an asynchronous execution with results types other than Channel.
This commit is contained in:
parent
685075f1a0
commit
8fbc513da7
@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* {@link GenericFutureListener} implementation which consolidates multiple {@link Future}s
|
||||
* into one, by listening to individual {@link Future}s and producing an aggregated result
|
||||
* (success/failure) when all {@link Future}s have completed.
|
||||
*
|
||||
* @param V the type of value returned by the {@link Future}
|
||||
* @param F the type of {@link Future}
|
||||
*/
|
||||
public class PromiseAggregator<V, F extends Future<V>> implements GenericFutureListener<F> {
|
||||
|
||||
private final Promise<?> aggregatePromise;
|
||||
private final boolean failPending;
|
||||
private Set<Promise<V>> pendingPromises;
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*
|
||||
* @param aggregatePromise the {@link Promise} to notify
|
||||
* @param failPending {@code true} to fail pending promises, false to leave them unaffected
|
||||
*/
|
||||
public PromiseAggregator(Promise<Void> aggregatePromise, boolean failPending) {
|
||||
if (aggregatePromise == null) {
|
||||
throw new NullPointerException("aggregatePromise");
|
||||
}
|
||||
this.aggregatePromise = aggregatePromise;
|
||||
this.failPending = failPending;
|
||||
}
|
||||
|
||||
/**
|
||||
* See {@link PromiseAggregator#PromiseAggregator(Promise, boolean)}.
|
||||
* Defaults {@code failPending} to true.
|
||||
*/
|
||||
public PromiseAggregator(Promise<Void> aggregatePromise) {
|
||||
this(aggregatePromise, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the given {@link Promise}s to the aggregator.
|
||||
*/
|
||||
public final PromiseAggregator<V, F> add(Promise<V>... promises) {
|
||||
if (promises == null) {
|
||||
throw new NullPointerException("promises");
|
||||
}
|
||||
if (promises.length == 0) {
|
||||
return this;
|
||||
}
|
||||
synchronized (this) {
|
||||
if (pendingPromises == null) {
|
||||
int size;
|
||||
if (promises.length > 1) {
|
||||
size = promises.length;
|
||||
} else {
|
||||
size = 2;
|
||||
}
|
||||
pendingPromises = new LinkedHashSet<Promise<V>>(size);
|
||||
}
|
||||
for (Promise<V> p : promises) {
|
||||
if (p == null) {
|
||||
continue;
|
||||
}
|
||||
pendingPromises.add(p);
|
||||
p.addListener(this);
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void operationComplete(F future) throws Exception {
|
||||
if (pendingPromises == null) {
|
||||
aggregatePromise.setSuccess(null);
|
||||
} else {
|
||||
pendingPromises.remove(future);
|
||||
if (!future.isSuccess()) {
|
||||
Throwable cause = future.cause();
|
||||
aggregatePromise.setFailure(cause);
|
||||
if (failPending) {
|
||||
for (Promise<V> pendingFuture : pendingPromises) {
|
||||
pendingFuture.setFailure(cause);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (pendingPromises.isEmpty()) {
|
||||
aggregatePromise.setSuccess(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
/**
|
||||
* {@link GenericFutureListener} implementation which takes other {@link Future}s
|
||||
* and notifies them on completion.
|
||||
*
|
||||
* @param V the type of value returned by the future
|
||||
* @param F the type of future
|
||||
*/
|
||||
public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureListener<F> {
|
||||
|
||||
private final Promise<? super V>[] promises;
|
||||
|
||||
/**
|
||||
* Create a new instance.
|
||||
*
|
||||
* @param promises the {@link Promise}s to notify once this {@link GenericFutureListener} is notified.
|
||||
*/
|
||||
public PromiseNotifier(Promise<? super V>... promises) {
|
||||
if (promises == null) {
|
||||
throw new NullPointerException("promises");
|
||||
}
|
||||
for (Promise<? super V> promise: promises) {
|
||||
if (promise == null) {
|
||||
throw new IllegalArgumentException("promises contains null Promise");
|
||||
}
|
||||
}
|
||||
this.promises = promises.clone();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void operationComplete(F future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
V result = future.get();
|
||||
for (Promise<? super V> p: promises) {
|
||||
p.setSuccess(result);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
Throwable cause = future.cause();
|
||||
for (Promise<? super V> p: promises) {
|
||||
p.setFailure(cause);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,132 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import static org.easymock.EasyMock.*;
|
||||
import static org.hamcrest.CoreMatchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
public class PromiseAggregatorTest {
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testNullAggregatePromise() {
|
||||
expectedException.expect(NullPointerException.class);
|
||||
new PromiseAggregator<Void, Future<Void>>(null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddNullFuture() {
|
||||
@SuppressWarnings("unchecked")
|
||||
Promise<Void> p = createStrictMock(Promise.class);
|
||||
PromiseAggregator<Void, Future<Void>> a =
|
||||
new PromiseAggregator<Void, Future<Void>>(p);
|
||||
expectedException.expect(NullPointerException.class);
|
||||
a.add((Promise<Void>[]) null);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testSucessfulNoPending() throws Exception {
|
||||
Promise<Void> p = createStrictMock(Promise.class);
|
||||
PromiseAggregator<Void, Future<Void>> a =
|
||||
new PromiseAggregator<Void, Future<Void>>(p);
|
||||
|
||||
Future<Void> future = createStrictMock(Future.class);
|
||||
expect(p.setSuccess(null)).andReturn(p);
|
||||
replay(future, p);
|
||||
|
||||
a.add();
|
||||
a.operationComplete(future);
|
||||
verify(future, p);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testSuccessfulPending() throws Exception {
|
||||
Promise<Void> p = createStrictMock(Promise.class);
|
||||
PromiseAggregator<Void, Future<Void>> a =
|
||||
new PromiseAggregator<Void, Future<Void>>(p);
|
||||
Promise<Void> p1 = createStrictMock(Promise.class);
|
||||
Promise<Void> p2 = createStrictMock(Promise.class);
|
||||
|
||||
expect(p1.addListener(a)).andReturn(p1);
|
||||
expect(p2.addListener(a)).andReturn(p2);
|
||||
expect(p1.isSuccess()).andReturn(true);
|
||||
expect(p2.isSuccess()).andReturn(true);
|
||||
expect(p.setSuccess(null)).andReturn(p);
|
||||
replay(p1, p2, p);
|
||||
|
||||
assertThat(a.add(p1, null, p2), is(a));
|
||||
a.operationComplete(p1);
|
||||
a.operationComplete(p2);
|
||||
|
||||
verify(p1, p2, p);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testFailedFutureFailPending() throws Exception {
|
||||
Promise<Void> p = createStrictMock(Promise.class);
|
||||
PromiseAggregator<Void, Future<Void>> a =
|
||||
new PromiseAggregator<Void, Future<Void>>(p);
|
||||
Promise<Void> p1 = createStrictMock(Promise.class);
|
||||
Promise<Void> p2 = createStrictMock(Promise.class);
|
||||
Throwable t = createStrictMock(Throwable.class);
|
||||
|
||||
expect(p1.addListener(a)).andReturn(p1);
|
||||
expect(p2.addListener(a)).andReturn(p2);
|
||||
expect(p1.isSuccess()).andReturn(false);
|
||||
expect(p1.cause()).andReturn(t);
|
||||
expect(p.setFailure(t)).andReturn(p);
|
||||
expect(p2.setFailure(t)).andReturn(p2);
|
||||
replay(p1, p2, p);
|
||||
|
||||
a.add(p1, p2);
|
||||
a.operationComplete(p1);
|
||||
verify(p1, p2, p);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testFailedFutureNoFailPending() throws Exception {
|
||||
Promise<Void> p = createStrictMock(Promise.class);
|
||||
PromiseAggregator<Void, Future<Void>> a =
|
||||
new PromiseAggregator<Void, Future<Void>>(p, false);
|
||||
Promise<Void> p1 = createStrictMock(Promise.class);
|
||||
Promise<Void> p2 = createStrictMock(Promise.class);
|
||||
Throwable t = createStrictMock(Throwable.class);
|
||||
|
||||
expect(p1.addListener(a)).andReturn(p1);
|
||||
expect(p2.addListener(a)).andReturn(p2);
|
||||
expect(p1.isSuccess()).andReturn(false);
|
||||
expect(p1.cause()).andReturn(t);
|
||||
expect(p.setFailure(t)).andReturn(p);
|
||||
replay(p1, p2, p);
|
||||
|
||||
a.add(p1, p2);
|
||||
a.operationComplete(p1);
|
||||
verify(p1, p2, p);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,90 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import static org.easymock.EasyMock.*;
|
||||
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
public class PromiseNotifierTest {
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testNullPromisesArray() {
|
||||
expectedException.expect(NullPointerException.class);
|
||||
new PromiseNotifier<Void, Future<Void>>((Promise<Void>[]) null);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testNullPromiseInArray() {
|
||||
expectedException.expect(IllegalArgumentException.class);
|
||||
new PromiseNotifier<Void, Future<Void>>((Promise<Void>) null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListenerSuccess() throws Exception {
|
||||
@SuppressWarnings("unchecked")
|
||||
Promise<Void> p1 = createStrictMock(Promise.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Promise<Void> p2 = createStrictMock(Promise.class);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
PromiseNotifier<Void, Future<Void>> notifier =
|
||||
new PromiseNotifier<Void, Future<Void>>(p1, p2);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Future<Void> future = createStrictMock(Future.class);
|
||||
expect(future.isSuccess()).andReturn(true);
|
||||
expect(future.get()).andReturn(null);
|
||||
expect(p1.setSuccess(null)).andReturn(p1);
|
||||
expect(p2.setSuccess(null)).andReturn(p2);
|
||||
replay(p1, p2, future);
|
||||
|
||||
notifier.operationComplete(future);
|
||||
verify(p1, p2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListenerFailure() throws Exception {
|
||||
@SuppressWarnings("unchecked")
|
||||
Promise<Void> p1 = createStrictMock(Promise.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Promise<Void> p2 = createStrictMock(Promise.class);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
PromiseNotifier<Void, Future<Void>> notifier =
|
||||
new PromiseNotifier<Void, Future<Void>>(p1, p2);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Future<Void> future = createStrictMock(Future.class);
|
||||
Throwable t = createStrictMock(Throwable.class);
|
||||
expect(future.isSuccess()).andReturn(false);
|
||||
expect(future.cause()).andReturn(t);
|
||||
expect(p1.setFailure(t)).andReturn(p1);
|
||||
expect(p2.setFailure(t)).andReturn(p2);
|
||||
replay(p1, p2, future);
|
||||
|
||||
notifier.operationComplete(future);
|
||||
verify(p1, p2);
|
||||
}
|
||||
|
||||
}
|
@ -16,79 +16,19 @@
|
||||
|
||||
package io.netty.channel;
|
||||
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import io.netty.util.concurrent.PromiseAggregator;
|
||||
|
||||
/**
|
||||
* Class which is used to consolidate multiple channel futures into one, by
|
||||
* listening to the individual futures and producing an aggregated result
|
||||
* (success/failure) when all futures have completed.
|
||||
*/
|
||||
public final class ChannelPromiseAggregator implements ChannelFutureListener {
|
||||
public final class ChannelPromiseAggregator
|
||||
extends PromiseAggregator<Void, ChannelFuture>
|
||||
implements ChannelFutureListener {
|
||||
|
||||
private final ChannelPromise aggregatePromise;
|
||||
private Set<ChannelPromise> pendingPromises;
|
||||
|
||||
/**
|
||||
* Instance an new {@link ChannelPromiseAggregator}
|
||||
*
|
||||
* @param aggregatePromise the {@link ChannelPromise} to notify
|
||||
*/
|
||||
public ChannelPromiseAggregator(ChannelPromise aggregatePromise) {
|
||||
if (aggregatePromise == null) {
|
||||
throw new NullPointerException("aggregatePromise");
|
||||
}
|
||||
this.aggregatePromise = aggregatePromise;
|
||||
super(aggregatePromise);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the given {@link ChannelPromise}s to the aggregator.
|
||||
*/
|
||||
public ChannelPromiseAggregator add(ChannelPromise... promises) {
|
||||
if (promises == null) {
|
||||
throw new NullPointerException("promises");
|
||||
}
|
||||
if (promises.length == 0) {
|
||||
return this;
|
||||
}
|
||||
synchronized (this) {
|
||||
if (pendingPromises == null) {
|
||||
int size;
|
||||
if (promises.length > 1) {
|
||||
size = promises.length;
|
||||
} else {
|
||||
size = 2;
|
||||
}
|
||||
pendingPromises = new LinkedHashSet<ChannelPromise>(size);
|
||||
}
|
||||
for (ChannelPromise p: promises) {
|
||||
if (p == null) {
|
||||
continue;
|
||||
}
|
||||
pendingPromises.add(p);
|
||||
p.addListener(this);
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (pendingPromises == null) {
|
||||
aggregatePromise.setSuccess();
|
||||
} else {
|
||||
pendingPromises.remove(future);
|
||||
if (!future.isSuccess()) {
|
||||
aggregatePromise.setFailure(future.cause());
|
||||
for (ChannelPromise pendingFuture : pendingPromises) {
|
||||
pendingFuture.setFailure(future.cause());
|
||||
}
|
||||
} else {
|
||||
if (pendingPromises.isEmpty()) {
|
||||
aggregatePromise.setSuccess();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,12 +15,14 @@
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.util.concurrent.PromiseNotifier;
|
||||
|
||||
/**
|
||||
* ChannelFutureListener implementation which takes other {@link ChannelFuture}(s) and notifies them on completion.
|
||||
*/
|
||||
public final class ChannelPromiseNotifier implements ChannelFutureListener {
|
||||
|
||||
private final ChannelPromise[] promises;
|
||||
public final class ChannelPromiseNotifier
|
||||
extends PromiseNotifier<Void, ChannelFuture>
|
||||
implements ChannelFutureListener {
|
||||
|
||||
/**
|
||||
* Create a new instance
|
||||
@ -28,29 +30,7 @@ public final class ChannelPromiseNotifier implements ChannelFutureListener {
|
||||
* @param promises the {@link ChannelPromise}s to notify once this {@link ChannelFutureListener} is notified.
|
||||
*/
|
||||
public ChannelPromiseNotifier(ChannelPromise... promises) {
|
||||
if (promises == null) {
|
||||
throw new NullPointerException("promises");
|
||||
}
|
||||
for (ChannelPromise promise: promises) {
|
||||
if (promise == null) {
|
||||
throw new IllegalArgumentException("promises contains null ChannelPromise");
|
||||
}
|
||||
}
|
||||
this.promises = promises.clone();
|
||||
super(promises);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture cf) throws Exception {
|
||||
if (cf.isSuccess()) {
|
||||
for (ChannelPromise p: promises) {
|
||||
p.setSuccess();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
Throwable cause = cf.cause();
|
||||
for (ChannelPromise p: promises) {
|
||||
p.setFailure(cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user