Fixing isDone in SimpleChannelPromiseAggregator

Motivation:

The isDone method is currently broken in the aggregator because the doneAllocatingPromises accidentally calls the overridden version of setSuccess, rather than calling the base class version. This causes the base class's version to never be called since allowNotificationEvent will evaluate to false. This means that setSuccess0 will never be set, resulting in isDone always returning false.

Modifications:

Changed setSuccess() to call the base class when appropriate, regardless of the result of allowNotificationEvent.

Result:

isDone now behaves properly for the promise aggregator.
This commit is contained in:
nmittler 2015-05-04 15:18:12 -07:00
parent a3cea186ce
commit 60a94f0c5f
2 changed files with 84 additions and 266 deletions

View File

@ -225,7 +225,7 @@ public final class Http2CodecUtil {
doneAllocating = true;
if (successfulCount == expectedCount) {
promise.setSuccess();
return super.setSuccess();
return super.setSuccess(null);
}
}
return this;
@ -233,7 +233,7 @@ public final class Http2CodecUtil {
@Override
public boolean tryFailure(Throwable cause) {
if (allowNotificationEvent()) {
if (awaitingPromises()) {
++failureCount;
if (failureCount == 1) {
promise.tryFailure(cause);
@ -254,7 +254,7 @@ public final class Http2CodecUtil {
*/
@Override
public ChannelPromise setFailure(Throwable cause) {
if (allowNotificationEvent()) {
if (awaitingPromises()) {
++failureCount;
if (failureCount == 1) {
promise.setFailure(cause);
@ -264,13 +264,13 @@ public final class Http2CodecUtil {
return this;
}
private boolean allowNotificationEvent() {
private boolean awaitingPromises() {
return successfulCount + failureCount < expectedCount;
}
@Override
public ChannelPromise setSuccess(Void result) {
if (allowNotificationEvent()) {
if (awaitingPromises()) {
++successfulCount;
if (successfulCount == expectedCount && doneAllocating) {
promise.setSuccess(result);
@ -282,7 +282,7 @@ public final class Http2CodecUtil {
@Override
public boolean trySuccess(Void result) {
if (allowNotificationEvent()) {
if (awaitingPromises()) {
++successfulCount;
if (successfulCount == expectedCount && doneAllocating) {
promise.trySuccess(result);

View File

@ -18,27 +18,24 @@ package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_UNSIGNED_INT;
import static io.netty.handler.codec.http2.Http2TestUtil.as;
import static io.netty.handler.codec.http2.Http2TestUtil.randomString;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
@ -54,8 +51,8 @@ public class DefaultHttp2FrameIOTest {
private DefaultHttp2FrameReader reader;
private DefaultHttp2FrameWriter writer;
private ByteBufAllocator alloc;
private CountDownLatch latch;
private ByteBuf buffer;
private ByteBuf data;
@Mock
private ChannelHandlerContext ctx;
@ -66,6 +63,9 @@ public class DefaultHttp2FrameIOTest {
@Mock
private ChannelPromise promise;
@Mock
private ChannelPromise aggregatePromise;
@Mock
private Channel channel;
@ -78,26 +78,23 @@ public class DefaultHttp2FrameIOTest {
alloc = UnpooledByteBufAllocator.DEFAULT;
buffer = alloc.buffer();
latch = new CountDownLatch(1);
data = dummyData();
when(executor.inEventLoop()).thenReturn(true);
when(ctx.alloc()).thenReturn(alloc);
when(ctx.channel()).thenReturn(channel);
when(ctx.executor()).thenReturn(executor);
doAnswer(new Answer<ChannelPromise>() {
when(ctx.newPromise()).thenReturn(promise);
when(promise.isDone()).thenReturn(true);
when(promise.isSuccess()).thenReturn(true);
doAnswer(new Answer<Void>() {
@Override
public ChannelPromise answer(InvocationOnMock invocation) throws Throwable {
return new DefaultChannelPromise(channel, executor);
public Void answer(InvocationOnMock in) throws Throwable {
ChannelFutureListener l = (ChannelFutureListener) in.getArguments()[0];
l.operationComplete(promise);
return null;
}
}).when(ctx).newPromise();
doAnswer(new Answer<ChannelPromise>() {
@Override
public ChannelPromise answer(InvocationOnMock in) throws Throwable {
latch.countDown();
return promise;
}
}).when(promise).setSuccess();
}).when(promise).addListener(any(ChannelFutureListener.class));
doAnswer(new Answer<ChannelFuture>() {
@Override
@ -121,97 +118,53 @@ public class DefaultHttp2FrameIOTest {
writer = new DefaultHttp2FrameWriter();
}
@After
public void tearDown() {
buffer.release();
data.release();
}
@Test
public void emptyDataShouldRoundtrip() throws Exception {
final ByteBuf data = Unpooled.EMPTY_BUFFER;
writer.writeData(ctx, 1000, data, 0, false, promise);
ByteBuf frame = null;
try {
frame = captureWrite();
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onDataRead(eq(ctx), eq(1000), eq(data), eq(0), eq(false));
} finally {
if (frame != null) {
frame.release();
}
data.release();
}
}
@Test
public void dataShouldRoundtrip() throws Exception {
final ByteBuf data = dummyData();
writer.writeData(ctx, 1000, data.retain().duplicate(), 0, false, promise);
ByteBuf frame = null;
try {
frame = captureWrite();
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onDataRead(eq(ctx), eq(1000), eq(data), eq(0), eq(false));
} finally {
if (frame != null) {
frame.release();
}
data.release();
}
}
@Test
public void dataWithPaddingShouldRoundtrip() throws Exception {
final ByteBuf data = dummyData();
writer.writeData(ctx, 1, data.retain().duplicate(), 0xFF, true, promise);
ByteBuf frame = null;
try {
frame = captureWrite();
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onDataRead(eq(ctx), eq(1), eq(data), eq(0xFF), eq(true));
} finally {
if (frame != null) {
frame.release();
}
data.release();
}
}
@Test
public void priorityShouldRoundtrip() throws Exception {
writer.writePriority(ctx, 1, 2, (short) 255, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onPriorityRead(eq(ctx), eq(1), eq(2), eq((short) 255), eq(true));
} finally {
frame.release();
}
}
@Test
public void rstStreamShouldRoundtrip() throws Exception {
writer.writeRstStream(ctx, 1, MAX_UNSIGNED_INT, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onRstStreamRead(eq(ctx), eq(1), eq(MAX_UNSIGNED_INT));
} finally {
frame.release();
}
}
@Test
public void emptySettingsShouldRoundtrip() throws Exception {
writer.writeSettings(ctx, new Http2Settings(), promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onSettingsRead(eq(ctx), eq(new Http2Settings()));
} finally {
frame.release();
}
}
@Test
@ -223,294 +176,159 @@ public class DefaultHttp2FrameIOTest {
settings.maxConcurrentStreams(456);
writer.writeSettings(ctx, settings, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onSettingsRead(eq(ctx), eq(settings));
} finally {
frame.release();
}
}
@Test
public void settingsAckShouldRoundtrip() throws Exception {
writer.writeSettingsAck(ctx, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onSettingsAckRead(eq(ctx));
} finally {
frame.release();
}
}
@Test
public void pingShouldRoundtrip() throws Exception {
ByteBuf data = dummyData();
writer.writePing(ctx, false, data.retain().duplicate(), promise);
ByteBuf frame = null;
try {
frame = captureWrite();
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onPingRead(eq(ctx), eq(data));
} finally {
if (frame != null) {
frame.release();
}
data.release();
}
}
@Test
public void pingAckShouldRoundtrip() throws Exception {
ByteBuf data = dummyData();
writer.writePing(ctx, true, data.retain().duplicate(), promise);
ByteBuf frame = null;
try {
frame = captureWrite();
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onPingAckRead(eq(ctx), eq(data));
} finally {
if (frame != null) {
frame.release();
}
data.release();
}
}
@Test
public void goAwayShouldRoundtrip() throws Exception {
ByteBuf data = dummyData();
writer.writeGoAway(ctx, 1, MAX_UNSIGNED_INT, data.retain().duplicate(), promise);
ByteBuf frame = null;
try {
frame = captureWrite();
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onGoAwayRead(eq(ctx), eq(1), eq(MAX_UNSIGNED_INT), eq(data));
} finally {
if (frame != null) {
frame.release();
}
data.release();
}
}
@Test
public void windowUpdateShouldRoundtrip() throws Exception {
writer.writeWindowUpdate(ctx, 1, Integer.MAX_VALUE, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onWindowUpdateRead(eq(ctx), eq(1), eq(Integer.MAX_VALUE));
} finally {
frame.release();
}
}
@Test
public void emptyHeadersShouldRoundtrip() throws Exception {
Http2Headers headers = EmptyHttp2Headers.INSTANCE;
writer.writeHeaders(ctx, 1, headers, 0, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true));
} finally {
frame.release();
}
}
@Test
public void emptyHeadersWithPaddingShouldRoundtrip() throws Exception {
Http2Headers headers = EmptyHttp2Headers.INSTANCE;
writer.writeHeaders(ctx, 1, headers, 0xFF, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0xFF), eq(true));
} finally {
frame.release();
}
}
@Test
public void binaryHeadersWithoutPriorityShouldRoundtrip() throws Exception {
Http2Headers headers = dummyBinaryHeaders();
writer.writeHeaders(ctx, 1, headers, 0, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true));
} finally {
frame.release();
}
}
@Test
public void headersWithoutPriorityShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writeHeaders(ctx, 1, headers, 0, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true));
} finally {
frame.release();
}
}
@Test
public void headersWithPaddingWithoutPriorityShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writeHeaders(ctx, 1, headers, 0xFF, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0xFF), eq(true));
} finally {
frame.release();
}
}
@Test
public void headersWithPriorityShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener)
.onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0), eq(true));
} finally {
frame.release();
}
}
@Test
public void headersWithPaddingWithPriorityShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0xFF, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0xFF),
eq(true));
} finally {
frame.release();
}
}
@Test
public void continuedHeadersShouldRoundtrip() throws Exception {
Http2Headers headers = largeHeaders();
writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener)
.onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0), eq(true));
} finally {
frame.release();
}
}
@Test
public void continuedHeadersWithPaddingShouldRoundtrip() throws Exception {
Http2Headers headers = largeHeaders();
writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0xFF, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0xFF),
eq(true));
} finally {
frame.release();
}
}
@Test
public void emptypushPromiseShouldRoundtrip() throws Exception {
Http2Headers headers = EmptyHttp2Headers.INSTANCE;
writer.writePushPromise(ctx, 1, 2, headers, 0, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0));
} finally {
frame.release();
}
}
@Test
public void pushPromiseShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writePushPromise(ctx, 1, 2, headers, 0, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0));
} finally {
frame.release();
}
}
@Test
public void pushPromiseWithPaddingShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writePushPromise(ctx, 1, 2, headers, 0xFF, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0xFF));
} finally {
frame.release();
}
}
@Test
public void continuedPushPromiseShouldRoundtrip() throws Exception {
Http2Headers headers = largeHeaders();
writer.writePushPromise(ctx, 1, 2, headers, 0, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0));
frame.release();
}
@Test
public void continuedPushPromiseWithPaddingShouldRoundtrip() throws Exception {
Http2Headers headers = largeHeaders();
writer.writePushPromise(ctx, 1, 2, headers, 0xFF, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
reader.readFrame(ctx, buffer, listener);
verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0xFF));
} finally {
frame.release();
}
}
private ByteBuf captureWrite() throws InterruptedException {
assertTrue(latch.await(2, TimeUnit.SECONDS));
return buffer;
}
private ByteBuf dummyData() {