Deprecate PromiseAggregator
Motivation: PromiseAggregator's API allows for the aggregate promise to complete before the user is done adding promises. In order to support this use case the API structure would need to change in a breaking manner. Modifications: - Deprecate PromiseAggregator and subclasses - Introduce PromiseCombiner which corrects these issues Result: PromiseCombiner corrects the deficiencies in PromiseAggregator.
This commit is contained in:
parent
0a959efc9e
commit
e775b49e95
@ -20,6 +20,8 @@ import java.util.LinkedHashSet;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @deprecated Use {@link PromiseCombiner}
|
||||||
|
*
|
||||||
* {@link GenericFutureListener} implementation which consolidates multiple {@link Future}s
|
* {@link GenericFutureListener} implementation which consolidates multiple {@link Future}s
|
||||||
* into one, by listening to individual {@link Future}s and producing an aggregated result
|
* into one, by listening to individual {@link Future}s and producing an aggregated result
|
||||||
* (success/failure) when all {@link Future}s have completed.
|
* (success/failure) when all {@link Future}s have completed.
|
||||||
@ -27,6 +29,7 @@ import java.util.Set;
|
|||||||
* @param V the type of value returned by the {@link Future}
|
* @param V the type of value returned by the {@link Future}
|
||||||
* @param F the type of {@link Future}
|
* @param F the type of {@link Future}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public class PromiseAggregator<V, F extends Future<V>> implements GenericFutureListener<F> {
|
public class PromiseAggregator<V, F extends Future<V>> implements GenericFutureListener<F> {
|
||||||
|
|
||||||
private final Promise<?> aggregatePromise;
|
private final Promise<?> aggregatePromise;
|
||||||
|
@ -0,0 +1,75 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2016 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.util.concurrent;
|
||||||
|
|
||||||
|
import io.netty.util.internal.ObjectUtil;
|
||||||
|
|
||||||
|
public final class PromiseCombiner {
|
||||||
|
private int expectedCount;
|
||||||
|
private int doneCount;
|
||||||
|
private boolean doneAdding;
|
||||||
|
private Promise<Void> aggregatePromise;
|
||||||
|
private Throwable cause;
|
||||||
|
private final GenericFutureListener<Future<?>> listener = new GenericFutureListener<Future<?>>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(Future<?> future) throws Exception {
|
||||||
|
++doneCount;
|
||||||
|
if (!future.isSuccess() && cause == null) {
|
||||||
|
cause = future.cause();
|
||||||
|
}
|
||||||
|
if (doneCount == expectedCount && doneAdding) {
|
||||||
|
tryPromise();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||||
|
public void add(Promise promise) {
|
||||||
|
checkAddAllowed();
|
||||||
|
++expectedCount;
|
||||||
|
promise.addListener(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||||
|
public void addAll(Promise... promises) {
|
||||||
|
checkAddAllowed();
|
||||||
|
expectedCount += promises.length;
|
||||||
|
for (Promise promise : promises) {
|
||||||
|
promise.addListener(listener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void finish(Promise<Void> aggregatePromise) {
|
||||||
|
if (doneAdding) {
|
||||||
|
throw new IllegalStateException("Already finished");
|
||||||
|
}
|
||||||
|
doneAdding = true;
|
||||||
|
this.aggregatePromise = ObjectUtil.checkNotNull(aggregatePromise, "aggregatePromise");
|
||||||
|
if (doneCount == expectedCount) {
|
||||||
|
tryPromise();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean tryPromise() {
|
||||||
|
return (cause == null) ? aggregatePromise.trySuccess(null) : aggregatePromise.tryFailure(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkAddAllowed() {
|
||||||
|
if (doneAdding) {
|
||||||
|
throw new IllegalStateException("Adding promises is not allowed after finished adding");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,194 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2016 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.util.concurrent;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.MockitoAnnotations;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class PromiseCombinerTest {
|
||||||
|
@Mock
|
||||||
|
private Promise<Void> p1;
|
||||||
|
private GenericFutureListener<Future<Void>> l1;
|
||||||
|
private GenericFutureListenerConsumer l1Consumer = new GenericFutureListenerConsumer() {
|
||||||
|
@Override
|
||||||
|
public void accept(GenericFutureListener<Future<Void>> listener) {
|
||||||
|
l1 = listener;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
@Mock
|
||||||
|
private Promise<Void> p2;
|
||||||
|
private GenericFutureListener<Future<Void>> l2;
|
||||||
|
private GenericFutureListenerConsumer l2Consumer = new GenericFutureListenerConsumer() {
|
||||||
|
@Override
|
||||||
|
public void accept(GenericFutureListener<Future<Void>> listener) {
|
||||||
|
l2 = listener;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
@Mock
|
||||||
|
private Promise<Void> p3;
|
||||||
|
private PromiseCombiner combiner;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
MockitoAnnotations.initMocks(this);
|
||||||
|
combiner = new PromiseCombiner();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNullAggregatePromise() {
|
||||||
|
combiner.finish(p1);
|
||||||
|
verify(p1).trySuccess(any(Void.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = NullPointerException.class)
|
||||||
|
public void testAddNullPromise() {
|
||||||
|
combiner.add(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = NullPointerException.class)
|
||||||
|
public void testAddAllNullPromise() {
|
||||||
|
combiner.addAll(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalStateException.class)
|
||||||
|
public void testAddAfterFinish() {
|
||||||
|
combiner.finish(p1);
|
||||||
|
combiner.add(p2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test(expected = IllegalStateException.class)
|
||||||
|
public void testAddAllAfterFinish() {
|
||||||
|
combiner.finish(p1);
|
||||||
|
combiner.addAll(p2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test(expected = IllegalStateException.class)
|
||||||
|
public void testFinishCalledTwiceThrows() {
|
||||||
|
combiner.finish(p1);
|
||||||
|
combiner.finish(p1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddAllSuccess() throws Exception {
|
||||||
|
mockSuccessPromise(p1, l1Consumer);
|
||||||
|
mockSuccessPromise(p2, l2Consumer);
|
||||||
|
combiner.addAll(p1, p2);
|
||||||
|
combiner.finish(p3);
|
||||||
|
l1.operationComplete(p1);
|
||||||
|
verifyNotCompleted(p3);
|
||||||
|
l2.operationComplete(p2);
|
||||||
|
verifySuccess(p3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddSuccess() throws Exception {
|
||||||
|
mockSuccessPromise(p1, l1Consumer);
|
||||||
|
mockSuccessPromise(p2, l2Consumer);
|
||||||
|
combiner.add(p1);
|
||||||
|
l1.operationComplete(p1);
|
||||||
|
combiner.add(p2);
|
||||||
|
l2.operationComplete(p2);
|
||||||
|
verifyNotCompleted(p3);
|
||||||
|
combiner.finish(p3);
|
||||||
|
verifySuccess(p3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddAllFail() throws Exception {
|
||||||
|
RuntimeException e1 = new RuntimeException("fake exception 1");
|
||||||
|
RuntimeException e2 = new RuntimeException("fake exception 2");
|
||||||
|
mockFailedPromise(p1, e1, l1Consumer);
|
||||||
|
mockFailedPromise(p2, e2, l2Consumer);
|
||||||
|
combiner.addAll(p1, p2);
|
||||||
|
combiner.finish(p3);
|
||||||
|
l1.operationComplete(p1);
|
||||||
|
verifyNotCompleted(p3);
|
||||||
|
l2.operationComplete(p2);
|
||||||
|
verifyFail(p3, e1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddFail() throws Exception {
|
||||||
|
RuntimeException e1 = new RuntimeException("fake exception 1");
|
||||||
|
RuntimeException e2 = new RuntimeException("fake exception 2");
|
||||||
|
mockFailedPromise(p1, e1, l1Consumer);
|
||||||
|
mockFailedPromise(p2, e2, l2Consumer);
|
||||||
|
combiner.add(p1);
|
||||||
|
l1.operationComplete(p1);
|
||||||
|
combiner.add(p2);
|
||||||
|
l2.operationComplete(p2);
|
||||||
|
verifyNotCompleted(p3);
|
||||||
|
combiner.finish(p3);
|
||||||
|
verifyFail(p3, e1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyFail(Promise<Void> p, Throwable cause) {
|
||||||
|
verify(p).tryFailure(eq(cause));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifySuccess(Promise<Void> p) {
|
||||||
|
verify(p).trySuccess(any(Void.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyNotCompleted(Promise<Void> p) {
|
||||||
|
verify(p, never()).trySuccess(any(Void.class));
|
||||||
|
verify(p, never()).tryFailure(any(Throwable.class));
|
||||||
|
verify(p, never()).setSuccess(any(Void.class));
|
||||||
|
verify(p, never()).setFailure(any(Throwable.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mockSuccessPromise(Promise<Void> p, GenericFutureListenerConsumer consumer) {
|
||||||
|
when(p.isDone()).thenReturn(true);
|
||||||
|
when(p.isSuccess()).thenReturn(true);
|
||||||
|
mockListener(p, consumer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mockFailedPromise(Promise<Void> p, Throwable cause, GenericFutureListenerConsumer consumer) {
|
||||||
|
when(p.isDone()).thenReturn(true);
|
||||||
|
when(p.isSuccess()).thenReturn(false);
|
||||||
|
when(p.cause()).thenReturn(cause);
|
||||||
|
mockListener(p, consumer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private void mockListener(final Promise<Void> p, final GenericFutureListenerConsumer consumer) {
|
||||||
|
doAnswer(new Answer<Promise<Void>>() {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Override
|
||||||
|
public Promise<Void> answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
consumer.accept(invocation.getArgumentAt(0, GenericFutureListener.class));
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
}).when(p).addListener(any(GenericFutureListener.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
interface GenericFutureListenerConsumer {
|
||||||
|
void accept(GenericFutureListener<Future<Void>> listener);
|
||||||
|
}
|
||||||
|
}
|
@ -17,12 +17,16 @@
|
|||||||
package io.netty.channel;
|
package io.netty.channel;
|
||||||
|
|
||||||
import io.netty.util.concurrent.PromiseAggregator;
|
import io.netty.util.concurrent.PromiseAggregator;
|
||||||
|
import io.netty.util.concurrent.PromiseCombiner;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @deprecated Use {@link PromiseCombiner}
|
||||||
|
*
|
||||||
* Class which is used to consolidate multiple channel futures into one, by
|
* Class which is used to consolidate multiple channel futures into one, by
|
||||||
* listening to the individual futures and producing an aggregated result
|
* listening to the individual futures and producing an aggregated result
|
||||||
* (success/failure) when all futures have completed.
|
* (success/failure) when all futures have completed.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public final class ChannelPromiseAggregator
|
public final class ChannelPromiseAggregator
|
||||||
extends PromiseAggregator<Void, ChannelFuture>
|
extends PromiseAggregator<Void, ChannelFuture>
|
||||||
implements ChannelFutureListener {
|
implements ChannelFutureListener {
|
||||||
|
@ -17,6 +17,7 @@ package io.netty.channel;
|
|||||||
|
|
||||||
import io.netty.util.Recycler;
|
import io.netty.util.Recycler;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
|
import io.netty.util.concurrent.PromiseCombiner;
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
@ -169,17 +170,22 @@ public final class PendingWriteQueue {
|
|||||||
size = 0;
|
size = 0;
|
||||||
|
|
||||||
ChannelPromise p = ctx.newPromise();
|
ChannelPromise p = ctx.newPromise();
|
||||||
ChannelPromiseAggregator aggregator = new ChannelPromiseAggregator(p);
|
PromiseCombiner combiner = new PromiseCombiner();
|
||||||
while (write != null) {
|
try {
|
||||||
PendingWrite next = write.next;
|
while (write != null) {
|
||||||
Object msg = write.msg;
|
PendingWrite next = write.next;
|
||||||
ChannelPromise promise = write.promise;
|
Object msg = write.msg;
|
||||||
recycle(write, false);
|
ChannelPromise promise = write.promise;
|
||||||
ctx.write(msg, promise);
|
recycle(write, false);
|
||||||
aggregator.add(promise);
|
combiner.add(promise);
|
||||||
write = next;
|
ctx.write(msg, promise);
|
||||||
|
write = next;
|
||||||
|
}
|
||||||
|
assertEmpty();
|
||||||
|
combiner.finish(p);
|
||||||
|
} catch (Throwable cause) {
|
||||||
|
p.setFailure(cause);
|
||||||
}
|
}
|
||||||
assertEmpty();
|
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user