From 60a94f0c5fcc5ea34f1ef048732f779b1afa50a2 Mon Sep 17 00:00:00 2001 From: nmittler Date: Mon, 4 May 2015 15:18:12 -0700 Subject: [PATCH] Fixing isDone in SimpleChannelPromiseAggregator Motivation: The isDone method is currently broken in the aggregator because the doneAllocatingPromises accidentally calls the overridden version of setSuccess, rather than calling the base class version. This causes the base class's version to never be called since allowNotificationEvent will evaluate to false. This means that setSuccess0 will never be set, resulting in isDone always returning false. Modifications: Changed setSuccess() to call the base class when appropriate, regardless of the result of allowNotificationEvent. Result: isDone now behaves properly for the promise aggregator. --- .../handler/codec/http2/Http2CodecUtil.java | 12 +- .../codec/http2/DefaultHttp2FrameIOTest.java | 338 ++++-------------- 2 files changed, 84 insertions(+), 266 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java index 87735a5326..b24fbcaada 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java @@ -225,7 +225,7 @@ public final class Http2CodecUtil { doneAllocating = true; if (successfulCount == expectedCount) { promise.setSuccess(); - return super.setSuccess(); + return super.setSuccess(null); } } return this; @@ -233,7 +233,7 @@ public final class Http2CodecUtil { @Override public boolean tryFailure(Throwable cause) { - if (allowNotificationEvent()) { + if (awaitingPromises()) { ++failureCount; if (failureCount == 1) { promise.tryFailure(cause); @@ -254,7 +254,7 @@ public final class Http2CodecUtil { */ @Override public ChannelPromise setFailure(Throwable cause) { - if (allowNotificationEvent()) { + if (awaitingPromises()) { ++failureCount; if (failureCount == 1) { promise.setFailure(cause); @@ -264,13 +264,13 @@ public final class Http2CodecUtil { return this; } - private boolean allowNotificationEvent() { + private boolean awaitingPromises() { return successfulCount + failureCount < expectedCount; } @Override public ChannelPromise setSuccess(Void result) { - if (allowNotificationEvent()) { + if (awaitingPromises()) { ++successfulCount; if (successfulCount == expectedCount && doneAllocating) { promise.setSuccess(result); @@ -282,7 +282,7 @@ public final class Http2CodecUtil { @Override public boolean trySuccess(Void result) { - if (allowNotificationEvent()) { + if (awaitingPromises()) { ++successfulCount; if (successfulCount == expectedCount && doneAllocating) { promise.trySuccess(result); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2FrameIOTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2FrameIOTest.java index ef98c58372..306ab30b07 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2FrameIOTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2FrameIOTest.java @@ -18,27 +18,24 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_UNSIGNED_INT; import static io.netty.handler.codec.http2.Http2TestUtil.as; import static io.netty.handler.codec.http2.Http2TestUtil.randomString; -import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.channel.DefaultChannelPromise; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.EventExecutor; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -54,8 +51,8 @@ public class DefaultHttp2FrameIOTest { private DefaultHttp2FrameReader reader; private DefaultHttp2FrameWriter writer; private ByteBufAllocator alloc; - private CountDownLatch latch; private ByteBuf buffer; + private ByteBuf data; @Mock private ChannelHandlerContext ctx; @@ -66,6 +63,9 @@ public class DefaultHttp2FrameIOTest { @Mock private ChannelPromise promise; + @Mock + private ChannelPromise aggregatePromise; + @Mock private Channel channel; @@ -78,26 +78,23 @@ public class DefaultHttp2FrameIOTest { alloc = UnpooledByteBufAllocator.DEFAULT; buffer = alloc.buffer(); - latch = new CountDownLatch(1); + data = dummyData(); when(executor.inEventLoop()).thenReturn(true); when(ctx.alloc()).thenReturn(alloc); when(ctx.channel()).thenReturn(channel); when(ctx.executor()).thenReturn(executor); - doAnswer(new Answer() { + when(ctx.newPromise()).thenReturn(promise); + when(promise.isDone()).thenReturn(true); + when(promise.isSuccess()).thenReturn(true); + doAnswer(new Answer() { @Override - public ChannelPromise answer(InvocationOnMock invocation) throws Throwable { - return new DefaultChannelPromise(channel, executor); + public Void answer(InvocationOnMock in) throws Throwable { + ChannelFutureListener l = (ChannelFutureListener) in.getArguments()[0]; + l.operationComplete(promise); + return null; } - }).when(ctx).newPromise(); - - doAnswer(new Answer() { - @Override - public ChannelPromise answer(InvocationOnMock in) throws Throwable { - latch.countDown(); - return promise; - } - }).when(promise).setSuccess(); + }).when(promise).addListener(any(ChannelFutureListener.class)); doAnswer(new Answer() { @Override @@ -121,97 +118,53 @@ public class DefaultHttp2FrameIOTest { writer = new DefaultHttp2FrameWriter(); } + @After + public void tearDown() { + buffer.release(); + data.release(); + } + @Test public void emptyDataShouldRoundtrip() throws Exception { final ByteBuf data = Unpooled.EMPTY_BUFFER; writer.writeData(ctx, 1000, data, 0, false, promise); - - ByteBuf frame = null; - try { - frame = captureWrite(); - reader.readFrame(ctx, frame, listener); - verify(listener).onDataRead(eq(ctx), eq(1000), eq(data), eq(0), eq(false)); - } finally { - if (frame != null) { - frame.release(); - } - data.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onDataRead(eq(ctx), eq(1000), eq(data), eq(0), eq(false)); } @Test public void dataShouldRoundtrip() throws Exception { - final ByteBuf data = dummyData(); writer.writeData(ctx, 1000, data.retain().duplicate(), 0, false, promise); - - ByteBuf frame = null; - try { - frame = captureWrite(); - reader.readFrame(ctx, frame, listener); - verify(listener).onDataRead(eq(ctx), eq(1000), eq(data), eq(0), eq(false)); - } finally { - if (frame != null) { - frame.release(); - } - data.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onDataRead(eq(ctx), eq(1000), eq(data), eq(0), eq(false)); } @Test public void dataWithPaddingShouldRoundtrip() throws Exception { - final ByteBuf data = dummyData(); writer.writeData(ctx, 1, data.retain().duplicate(), 0xFF, true, promise); - - ByteBuf frame = null; - try { - frame = captureWrite(); - reader.readFrame(ctx, frame, listener); - verify(listener).onDataRead(eq(ctx), eq(1), eq(data), eq(0xFF), eq(true)); - } finally { - if (frame != null) { - frame.release(); - } - data.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onDataRead(eq(ctx), eq(1), eq(data), eq(0xFF), eq(true)); } @Test public void priorityShouldRoundtrip() throws Exception { writer.writePriority(ctx, 1, 2, (short) 255, true, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onPriorityRead(eq(ctx), eq(1), eq(2), eq((short) 255), eq(true)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onPriorityRead(eq(ctx), eq(1), eq(2), eq((short) 255), eq(true)); } @Test public void rstStreamShouldRoundtrip() throws Exception { writer.writeRstStream(ctx, 1, MAX_UNSIGNED_INT, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onRstStreamRead(eq(ctx), eq(1), eq(MAX_UNSIGNED_INT)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onRstStreamRead(eq(ctx), eq(1), eq(MAX_UNSIGNED_INT)); } @Test public void emptySettingsShouldRoundtrip() throws Exception { writer.writeSettings(ctx, new Http2Settings(), promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onSettingsRead(eq(ctx), eq(new Http2Settings())); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onSettingsRead(eq(ctx), eq(new Http2Settings())); } @Test @@ -223,294 +176,159 @@ public class DefaultHttp2FrameIOTest { settings.maxConcurrentStreams(456); writer.writeSettings(ctx, settings, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onSettingsRead(eq(ctx), eq(settings)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onSettingsRead(eq(ctx), eq(settings)); } @Test public void settingsAckShouldRoundtrip() throws Exception { writer.writeSettingsAck(ctx, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onSettingsAckRead(eq(ctx)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onSettingsAckRead(eq(ctx)); } @Test public void pingShouldRoundtrip() throws Exception { - ByteBuf data = dummyData(); writer.writePing(ctx, false, data.retain().duplicate(), promise); - - ByteBuf frame = null; - try { - frame = captureWrite(); - reader.readFrame(ctx, frame, listener); - verify(listener).onPingRead(eq(ctx), eq(data)); - } finally { - if (frame != null) { - frame.release(); - } - data.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onPingRead(eq(ctx), eq(data)); } @Test public void pingAckShouldRoundtrip() throws Exception { - ByteBuf data = dummyData(); writer.writePing(ctx, true, data.retain().duplicate(), promise); - - ByteBuf frame = null; - try { - frame = captureWrite(); - reader.readFrame(ctx, frame, listener); - verify(listener).onPingAckRead(eq(ctx), eq(data)); - } finally { - if (frame != null) { - frame.release(); - } - data.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onPingAckRead(eq(ctx), eq(data)); } @Test public void goAwayShouldRoundtrip() throws Exception { - ByteBuf data = dummyData(); writer.writeGoAway(ctx, 1, MAX_UNSIGNED_INT, data.retain().duplicate(), promise); - - ByteBuf frame = null; - try { - frame = captureWrite(); - reader.readFrame(ctx, frame, listener); - verify(listener).onGoAwayRead(eq(ctx), eq(1), eq(MAX_UNSIGNED_INT), eq(data)); - } finally { - if (frame != null) { - frame.release(); - } - data.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onGoAwayRead(eq(ctx), eq(1), eq(MAX_UNSIGNED_INT), eq(data)); } @Test public void windowUpdateShouldRoundtrip() throws Exception { writer.writeWindowUpdate(ctx, 1, Integer.MAX_VALUE, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onWindowUpdateRead(eq(ctx), eq(1), eq(Integer.MAX_VALUE)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onWindowUpdateRead(eq(ctx), eq(1), eq(Integer.MAX_VALUE)); } @Test public void emptyHeadersShouldRoundtrip() throws Exception { Http2Headers headers = EmptyHttp2Headers.INSTANCE; writer.writeHeaders(ctx, 1, headers, 0, true, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true)); } @Test public void emptyHeadersWithPaddingShouldRoundtrip() throws Exception { Http2Headers headers = EmptyHttp2Headers.INSTANCE; writer.writeHeaders(ctx, 1, headers, 0xFF, true, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0xFF), eq(true)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0xFF), eq(true)); } @Test public void binaryHeadersWithoutPriorityShouldRoundtrip() throws Exception { Http2Headers headers = dummyBinaryHeaders(); writer.writeHeaders(ctx, 1, headers, 0, true, promise); - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true)); } @Test public void headersWithoutPriorityShouldRoundtrip() throws Exception { Http2Headers headers = dummyHeaders(); writer.writeHeaders(ctx, 1, headers, 0, true, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true)); } @Test public void headersWithPaddingWithoutPriorityShouldRoundtrip() throws Exception { Http2Headers headers = dummyHeaders(); writer.writeHeaders(ctx, 1, headers, 0xFF, true, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0xFF), eq(true)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0xFF), eq(true)); } @Test public void headersWithPriorityShouldRoundtrip() throws Exception { Http2Headers headers = dummyHeaders(); writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0, true, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener) - .onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0), eq(true)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener) + .onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0), eq(true)); } @Test public void headersWithPaddingWithPriorityShouldRoundtrip() throws Exception { Http2Headers headers = dummyHeaders(); writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0xFF, true, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0xFF), - eq(true)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0xFF), + eq(true)); } @Test public void continuedHeadersShouldRoundtrip() throws Exception { Http2Headers headers = largeHeaders(); writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0, true, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener) - .onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0), eq(true)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener) + .onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0), eq(true)); } @Test public void continuedHeadersWithPaddingShouldRoundtrip() throws Exception { Http2Headers headers = largeHeaders(); writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0xFF, true, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0xFF), - eq(true)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0xFF), + eq(true)); } @Test public void emptypushPromiseShouldRoundtrip() throws Exception { Http2Headers headers = EmptyHttp2Headers.INSTANCE; writer.writePushPromise(ctx, 1, 2, headers, 0, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0)); } @Test public void pushPromiseShouldRoundtrip() throws Exception { Http2Headers headers = dummyHeaders(); writer.writePushPromise(ctx, 1, 2, headers, 0, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0)); } @Test public void pushPromiseWithPaddingShouldRoundtrip() throws Exception { Http2Headers headers = dummyHeaders(); writer.writePushPromise(ctx, 1, 2, headers, 0xFF, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0xFF)); - } finally { - frame.release(); - } + reader.readFrame(ctx, buffer, listener); + verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0xFF)); } @Test public void continuedPushPromiseShouldRoundtrip() throws Exception { Http2Headers headers = largeHeaders(); writer.writePushPromise(ctx, 1, 2, headers, 0, promise); - ByteBuf frame = captureWrite(); - reader.readFrame(ctx, frame, listener); + reader.readFrame(ctx, buffer, listener); verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0)); - frame.release(); } @Test public void continuedPushPromiseWithPaddingShouldRoundtrip() throws Exception { Http2Headers headers = largeHeaders(); writer.writePushPromise(ctx, 1, 2, headers, 0xFF, promise); - - ByteBuf frame = captureWrite(); - try { - reader.readFrame(ctx, frame, listener); - verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0xFF)); - } finally { - frame.release(); - } - } - - private ByteBuf captureWrite() throws InterruptedException { - assertTrue(latch.await(2, TimeUnit.SECONDS)); - return buffer; + reader.readFrame(ctx, buffer, listener); + verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0xFF)); } private ByteBuf dummyData() {