From 8271c8afccf7eb21bf6f994b5f3565f6d55bdee4 Mon Sep 17 00:00:00 2001 From: Louis Ryan Date: Thu, 23 Apr 2015 14:23:23 -0700 Subject: [PATCH] Remove explicit flushes from HTTP2 encoders, decoders & flow-controllers Motivation: Allow users of HTTP2 to control when flushes occur so they can optimize network writes. Modifications: Removed explicit calls to flush in encoder, decoder & flow-controller Connection handler now calls flush on read-complete to enable batching writes in response to reads Result: Much less flushing occurs for normal HTTP2 request and response patterns. --- .../http2/DefaultHttp2ConnectionDecoder.java | 1 - .../http2/DefaultHttp2ConnectionEncoder.java | 22 ++----- .../DefaultHttp2LocalFlowController.java | 1 - .../DefaultHttp2RemoteFlowController.java | 25 +------ .../codec/http2/Http2ConnectionHandler.java | 14 +++- .../http2/Http2RemoteFlowController.java | 9 ++- .../DefaultHttp2ConnectionDecoderTest.java | 6 +- .../DefaultHttp2ConnectionEncoderTest.java | 12 ++-- .../DefaultHttp2LocalFlowControllerTest.java | 2 + .../DefaultHttp2RemoteFlowControllerTest.java | 65 ++++++++----------- .../http2/Http2ConnectionHandlerTest.java | 7 ++ .../http2/Http2ConnectionRoundtripTest.java | 7 ++ 12 files changed, 79 insertions(+), 92 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index 3819e159a9..0bf68ef679 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -443,7 +443,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { // Send an ack back to the remote client. // Need to retain the buffer here since it will be released after the write completes. encoder.writePing(ctx, true, data.retain(), ctx.newPromise()); - ctx.flush(); listener.onPingRead(ctx, data); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index d64045583f..7d86936552 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -199,7 +199,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { } ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise); - ctx.flush(); return future; } @@ -224,21 +223,18 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { } ChannelFuture future = frameWriter.writeSettings(ctx, settings, promise); - ctx.flush(); return future; } @Override public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) { ChannelFuture future = frameWriter.writeSettingsAck(ctx, promise); - ctx.flush(); return future; } @Override public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) { ChannelFuture future = frameWriter.writePing(ctx, ack, data, promise); - ctx.flush(); return future; } @@ -258,7 +254,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { } ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise); - ctx.flush(); return future; } @@ -345,16 +340,13 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { } @Override - public boolean write(int allowedBytes) { + public void write(int allowedBytes) { int bytesWritten = 0; + if (data == null || (allowedBytes == 0 && size != 0)) { + // No point writing an empty DATA frame, wait for a bigger allowance. + return; + } try { - if (data == null) { - return false; - } - if (allowedBytes == 0 && size != 0) { - // No point writing an empty DATA frame, wait for a bigger allowance. - return false; - } int maxFrameSize = frameWriter().configuration().frameSizePolicy().maxFrameSize(); do { int allowedFrameSize = Math.min(maxFrameSize, allowedBytes - bytesWritten); @@ -386,7 +378,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding, size == bytesWritten && endOfStream, writePromise); } while (size != bytesWritten && allowedBytes > bytesWritten); - return true; } finally { size -= bytesWritten; } @@ -427,10 +418,9 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { } @Override - public boolean write(int allowedBytes) { + public void write(int allowedBytes) { frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive, padding, endOfStream, promise); - return true; } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java index 9001145388..6d6369b492 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java @@ -413,7 +413,6 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController // Send a window update for the stream/connection. frameWriter.writeWindowUpdate(ctx, stream.id(), deltaWindowSize, ctx.newPromise()); - ctx.flush(); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index 5e15a83a93..bc4d7d6e2c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -46,7 +46,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll private final Http2Connection.PropertyKey stateKey; private int initialWindowSize = DEFAULT_WINDOW_SIZE; private ChannelHandlerContext ctx; - private boolean needFlush; public DefaultHttp2RemoteFlowController(Http2Connection connection) { this.connection = checkNotNull(connection, "connection"); @@ -185,7 +184,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll AbstractState state = state(stream); state.incrementStreamWindow(delta); state.writeBytes(state.writableWindow()); - flush(); } } @@ -207,11 +205,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return; } state.writeBytes(state.writableWindow()); - try { - flush(); - } catch (Throwable t) { - frame.error(t); - } } /** @@ -237,16 +230,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return connectionState().windowSize(); } - /** - * Flushes the {@link ChannelHandlerContext} if we've received any data frames. - */ - private void flush() { - if (needFlush) { - ctx.flush(); - needFlush = false; - } - } - /** * Writes as many pending bytes as possible, according to stream priority. */ @@ -260,7 +243,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll // Now write all of the allocated bytes. connection.forEachActiveStream(WRITE_ALLOCATED_BYTES); - flush(); } } @@ -604,13 +586,10 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll /** * Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending * queue, the written bytes are removed from this branch of the priority tree. - *

- * Note: this does not flush the {@link ChannelHandlerContext}. - *

*/ private int write(FlowControlled frame, int allowedBytes) { int before = frame.size(); - int writtenBytes = 0; + int writtenBytes; // In case an exception is thrown we want to remember it and pass it to cancel(Throwable). Throwable cause = null; try { @@ -618,7 +597,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll // Write the portion of the frame. writing = true; - needFlush |= frame.write(max(0, allowedBytes)); + frame.write(max(0, allowedBytes)); if (!cancelled && frame.size() == 0) { // This frame has been fully written, remove this frame and notify it. Since we remove this frame // first, we're guaranteed that its error method will not be called when we call cancel. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 0d996be012..75a50ee584 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -359,6 +359,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http } ChannelFuture future = goAway(ctx, null); + ctx.flush(); // If there are no active streams, close immediately after the send is complete. // Otherwise wait until all streams are inactive. @@ -389,6 +390,13 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http ctx.flush(); } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + // Trigger flush after read on the assumption that flush is cheap if there is nothing to write and that + // for flow-control the read may release window that causes data to be written that can now be flushed. + ctx.flush(); + } + /** * Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions. */ @@ -478,6 +486,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http } else { onConnectionError(ctx, cause, embedded); } + ctx.flush(); } /** @@ -522,7 +531,6 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http } ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise); - ctx.flush(); // Synchronously set the resetSent flag to prevent any subsequent calls // from resulting in multiple reset frames being sent. @@ -557,7 +565,6 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http connection.goAwaySent(lastStreamId, errorCode, debugData); ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise); - ctx.flush(); future.addListener(new GenericFutureListener() { @Override @@ -585,7 +592,8 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http } /** - * Close the remote endpoint with with a {@code GO_AWAY} frame. + * Close the remote endpoint with with a {@code GO_AWAY} frame. Does not flush + * immediately, this is the responsibility of the caller. */ private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause) { long errorCode = cause != null ? cause.error().code() : NO_ERROR.code(); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java index 75fa02bf34..51ac7a1614 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java @@ -27,8 +27,8 @@ public interface Http2RemoteFlowController extends Http2FlowController { * guarantee when the data will be written or whether it will be split into multiple frames * before sending. *

- * Manually flushing the {@link ChannelHandlerContext} is not required, since the flow - * controller will flush as appropriate. + * Manually flushing the {@link ChannelHandlerContext} is required for writes as the flow controller will + * not flush by itself. * * @param ctx the context from the handler. * @param stream the subject stream. Must not be the connection stream object. @@ -75,15 +75,14 @@ public interface Http2RemoteFlowController extends Http2FlowController { * Writes up to {@code allowedBytes} of the encapsulated payload to the stream. Note that * a value of 0 may be passed which will allow payloads with flow-control size == 0 to be * written. The flow-controller may call this method multiple times with different values until - * the payload is fully written. + * the payload is fully written, i.e it's size after the write is 0. *

* When an exception is thrown the {@link Http2RemoteFlowController} will make a call to * {@link #error(Throwable)}. *

* * @param allowedBytes an upper bound on the number of bytes the payload can write at this time. - * @return {@code true} if a flush is required, {@code false} otherwise. */ - boolean write(int allowedBytes); + void write(int allowedBytes); } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java index 6fb4544c2b..648b022645 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java @@ -35,6 +35,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -48,6 +49,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException; +import junit.framework.AssertionFailedError; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -161,6 +163,9 @@ public class DefaultHttp2ConnectionDecoderTest { // Simulate receiving the SETTINGS ACK for the initial settings. decode().onSettingsAckRead(ctx); + + // Disallow any further flushes now that settings ACK has been sent + when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden")); } @Test @@ -605,7 +610,6 @@ public class DefaultHttp2ConnectionDecoderTest { @Test public void settingsReadShouldSetValues() throws Exception { - when(connection.isServer()).thenReturn(true); Http2Settings settings = new Http2Settings(); settings.pushEnabled(true); settings.initialWindowSize(123); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java index deb29e79b6..bf198a4b49 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java @@ -60,6 +60,7 @@ import io.netty.util.concurrent.ImmediateEventExecutor; import java.util.ArrayList; import java.util.List; +import junit.framework.AssertionFailedError; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -207,6 +208,7 @@ public class DefaultHttp2ConnectionEncoderTest { when(ctx.newSucceededFuture()).thenReturn(future); when(ctx.newPromise()).thenReturn(promise); when(ctx.write(any())).thenReturn(future); + when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden")); encoder = new DefaultHttp2ConnectionEncoder(connection, writer); encoder.lifecycleManager(lifecycleManager); @@ -217,7 +219,7 @@ public class DefaultHttp2ConnectionEncoderTest { final ByteBuf data = dummyData(); encoder.writeData(ctx, STREAM_ID, data, 0, true, promise); assertEquals(payloadCaptor.getValue().size(), 8); - assertTrue(payloadCaptor.getValue().write(8)); + payloadCaptor.getValue().write(8); assertEquals(0, payloadCaptor.getValue().size()); assertEquals("abcdefgh", writtenData.get(0)); assertEquals(0, data.refCnt()); @@ -229,7 +231,7 @@ public class DefaultHttp2ConnectionEncoderTest { final ByteBuf data = dummyData(); encoder.writeData(ctx, STREAM_ID, data, 0, true, promise); assertEquals(payloadCaptor.getValue().size(), 8); - assertTrue(payloadCaptor.getValue().write(8)); + payloadCaptor.getValue().write(8); // writer was called 3 times assertEquals(3, writtenData.size()); assertEquals("abc", writtenData.get(0)); @@ -244,7 +246,7 @@ public class DefaultHttp2ConnectionEncoderTest { final ByteBuf data = dummyData(); encoder.writeData(ctx, STREAM_ID, data, 5, true, promise); assertEquals(payloadCaptor.getValue().size(), 13); - assertTrue(payloadCaptor.getValue().write(13)); + payloadCaptor.getValue().write(13); // writer was called 3 times assertEquals(3, writtenData.size()); assertEquals("abcde", writtenData.get(0)); @@ -262,7 +264,7 @@ public class DefaultHttp2ConnectionEncoderTest { ByteBuf data = dummyData(); encoder.writeData(ctx, STREAM_ID, data, 10, true, promise); assertEquals(payloadCaptor.getValue().size(), 18); - assertTrue(payloadCaptor.getValue().write(18)); + payloadCaptor.getValue().write(18); // writer was called 4 times assertEquals(4, writtenData.size()); assertEquals("abcde", writtenData.get(0)); @@ -292,7 +294,7 @@ public class DefaultHttp2ConnectionEncoderTest { when(frameSizePolicy.maxFrameSize()).thenReturn(5); encoder.writeData(ctx, STREAM_ID, data, 10, true, promise); assertEquals(payloadCaptor.getValue().size(), 10); - assertTrue(payloadCaptor.getValue().write(10)); + payloadCaptor.getValue().write(10); // writer was called 2 times assertEquals(2, writtenData.size()); assertEquals("", writtenData.get(0)); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java index 30400d72ee..5fc4e0df87 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java @@ -30,6 +30,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import junit.framework.AssertionFailedError; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -64,6 +65,7 @@ public class DefaultHttp2LocalFlowControllerTest { MockitoAnnotations.initMocks(this); when(ctx.newPromise()).thenReturn(promise); + when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden")); connection = new DefaultHttp2Connection(false); controller = new DefaultHttp2LocalFlowController(connection, frameWriter, updateRatio); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java index b69b3bcdac..90e7e9d6d0 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -40,7 +41,9 @@ import io.netty.util.collection.IntObjectMap; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import junit.framework.AssertionFailedError; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -83,6 +86,7 @@ public class DefaultHttp2RemoteFlowControllerTest { MockitoAnnotations.initMocks(this); when(ctx.newPromise()).thenReturn(promise); + when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden")); connection = new DefaultHttp2Connection(false); controller = new DefaultHttp2RemoteFlowController(connection); @@ -127,19 +131,17 @@ public class DefaultHttp2RemoteFlowControllerTest { } @Test - public void payloadSmallerThanWindowShouldBeSentImmediately() throws Http2Exception { + public void payloadSmallerThanWindowShouldBeWrittenImmediately() throws Http2Exception { FakeFlowControlled data = new FakeFlowControlled(5); sendData(STREAM_A, data); data.assertFullyWritten(); - verify(ctx, times(1)).flush(); } @Test - public void emptyPayloadShouldBeSentImmediately() throws Http2Exception { + public void emptyPayloadShouldBeWrittenImmediately() throws Http2Exception { FakeFlowControlled data = new FakeFlowControlled(0); sendData(STREAM_A, data); data.assertFullyWritten(); - verify(ctx, times(1)).flush(); } @Test @@ -152,7 +154,6 @@ public class DefaultHttp2RemoteFlowControllerTest { data.assertNotWritten(); sendData(STREAM_A, moreData); moreData.assertNotWritten(); - verify(ctx, never()).flush(); } @Test @@ -165,7 +166,6 @@ public class DefaultHttp2RemoteFlowControllerTest { data.assertNotWritten(); sendData(STREAM_A, moreData); moreData.assertNotWritten(); - verify(ctx, never()).flush(); connection.stream(STREAM_A).close(); data.assertError(); @@ -180,7 +180,6 @@ public class DefaultHttp2RemoteFlowControllerTest { sendData(STREAM_A, data); // Verify that a partial frame of 5 remains to be sent data.assertPartiallyWritten(5); - verify(ctx, times(1)).flush(); } @Test @@ -193,14 +192,12 @@ public class DefaultHttp2RemoteFlowControllerTest { sendData(STREAM_A, moreData); data.assertPartiallyWritten(10); moreData.assertNotWritten(); - verify(ctx, times(1)).flush(); reset(ctx); // Update the window and verify that the rest of data and some of moreData are written incrementWindowSize(STREAM_A, 15); data.assertFullyWritten(); moreData.assertPartiallyWritten(5); - verify(ctx, times(1)).flush(); assertEquals(DEFAULT_WINDOW_SIZE - 25, window(CONNECTION_STREAM_ID)); assertEquals(0, window(STREAM_A)); @@ -1109,26 +1106,21 @@ public class DefaultHttp2RemoteFlowControllerTest { public void flowControlledWriteCompleteThrowsAnException() throws Exception { final Http2RemoteFlowController.FlowControlled flowControlled = Mockito.mock(Http2RemoteFlowController.FlowControlled.class); - when(flowControlled.size()).thenReturn(100); - when(flowControlled.write(anyInt())).thenAnswer(new Answer() { - private int invocationCount; + final AtomicInteger size = new AtomicInteger(150); + doAnswer(new Answer() { @Override - public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { - switch(invocationCount) { - case 0: - when(flowControlled.size()).thenReturn(50); - invocationCount = 1; - return true; - case 1: - when(flowControlled.size()).thenReturn(20); - invocationCount = 2; - return true; - default: - when(flowControlled.size()).thenReturn(0); - return false; - } + public Integer answer(InvocationOnMock invocationOnMock) throws Throwable { + return size.get(); } - }); + }).when(flowControlled).size(); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + size.addAndGet(-50); + return null; + } + }).when(flowControlled).write(anyInt()); + final Http2Stream stream = stream(STREAM_A); doAnswer(new Answer() { public Void answer(InvocationOnMock invocationOnMock) { @@ -1148,7 +1140,7 @@ public class DefaultHttp2RemoteFlowControllerTest { verify(flowControlled, never()).error(any(Throwable.class)); verify(flowControlled).writeComplete(); - assertEquals(100, windowBefore - window(STREAM_A)); + assertEquals(150, windowBefore - window(STREAM_A)); } @Test @@ -1157,7 +1149,7 @@ public class DefaultHttp2RemoteFlowControllerTest { Mockito.mock(Http2RemoteFlowController.FlowControlled.class); final Http2Stream stream = stream(STREAM_A); when(flowControlled.size()).thenReturn(100); - when(flowControlled.write(anyInt())).thenThrow(new RuntimeException("write failed")); + doThrow(new RuntimeException("write failed")).when(flowControlled).write(anyInt()); doAnswer(new Answer() { public Void answer(InvocationOnMock invocationOnMock) { stream.close(); @@ -1176,25 +1168,25 @@ public class DefaultHttp2RemoteFlowControllerTest { final Http2RemoteFlowController.FlowControlled flowControlled = Mockito.mock(Http2RemoteFlowController.FlowControlled.class); when(flowControlled.size()).thenReturn(100); - when(flowControlled.write(anyInt())).thenAnswer(new Answer() { + doAnswer(new Answer() { private int invocationCount; @Override - public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { switch(invocationCount) { case 0: when(flowControlled.size()).thenReturn(50); invocationCount = 1; - return true; + return null; case 1: when(flowControlled.size()).thenReturn(20); invocationCount = 2; - return true; + return null; default: when(flowControlled.size()).thenReturn(10); throw new RuntimeException("Write failed"); } } - }); + }).when(flowControlled).write(anyInt()); return flowControlled; } @@ -1265,15 +1257,14 @@ public class DefaultHttp2RemoteFlowControllerTest { } @Override - public boolean write(int allowedBytes) { + public void write(int allowedBytes) { if (allowedBytes <= 0 && currentSize != 0) { // Write has been called but no data can be written - return false; + return; } writeCalled = true; int written = Math.min(currentSize, allowedBytes); currentSize -= written; - return true; } public int written() { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java index 832ff66f40..bb54701006 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java @@ -358,4 +358,11 @@ public class Http2ConnectionHandlerTest { verify(data).release(); verifyNoMoreInteractions(frameWriter); } + + @Test + public void channelReadCompleteTriggersFlush() throws Exception { + handler = newHandler(); + handler.channelReadComplete(ctx); + verify(ctx, times(1)).flush(); + } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java index b069c6afb6..9e85856137 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java @@ -130,6 +130,7 @@ public class Http2ConnectionRoundtripTest { public void run() { http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false, newPromise()); + ctx().flush(); } }); @@ -174,6 +175,7 @@ public class Http2ConnectionRoundtripTest { public void run() { http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false, newPromise()); + ctx().flush(); } }); @@ -206,6 +208,7 @@ public class Http2ConnectionRoundtripTest { public void run() { http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false, newPromise()); + ctx().flush(); } }); @@ -237,6 +240,7 @@ public class Http2ConnectionRoundtripTest { public void run() { http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, true, newPromise()); + ctx().flush(); } }); @@ -247,6 +251,7 @@ public class Http2ConnectionRoundtripTest { public void run() { http2Client.encoder().writeHeaders(ctx(), Integer.MAX_VALUE + 1, headers, 0, (short) 16, false, 0, true, newPromise()); + ctx().flush(); } }); @@ -292,6 +297,7 @@ public class Http2ConnectionRoundtripTest { // Write trailers. http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, true, newPromise()); + ctx().flush(); } }); @@ -376,6 +382,7 @@ public class Http2ConnectionRoundtripTest { // Write trailers. http2Client.encoder().writeHeaders(ctx(), streamId, headers, 0, (short) 16, false, 0, true, newPromise()); + ctx().flush(); } } });