diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index 73988cd2a8..0781ef3ea0 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -192,6 +192,10 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { frameReader.close(); } + private static int unprocessedBytes(Http2Stream stream) { + return stream.inboundFlow().unProcessedBytes(); + } + /** * Handles all inbound frames from the network. */ @@ -212,7 +216,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { // We should ignore this frame if RST_STREAM was sent or if GO_AWAY was sent with a // lower stream ID. boolean shouldApplyFlowControl = false; - int processedBytes = data.readableBytes() + padding; boolean shouldIgnore = shouldIgnoreFrame(stream); Http2Exception error = null; switch (stream.state()) { @@ -240,15 +243,19 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { break; } + int bytesToReturn = data.readableBytes() + padding; + int unprocessedBytes = unprocessedBytes(stream); try { // If we should apply flow control, do so now. if (shouldApplyFlowControl) { inboundFlow.applyFlowControl(ctx, streamId, data, padding, endOfStream); + // Update the unprocessed bytes after flow control is applied. + unprocessedBytes = unprocessedBytes(stream); } // If we should ignore this frame, do so now. if (shouldIgnore) { - return processedBytes; + return bytesToReturn; } // If the stream was in an invalid state to receive the frame, throw the error. @@ -258,12 +265,26 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { // Call back the application and retrieve the number of bytes that have been // immediately processed. - processedBytes = listener.onDataRead(ctx, streamId, data, padding, endOfStream); - return processedBytes; + bytesToReturn = listener.onDataRead(ctx, streamId, data, padding, endOfStream); + return bytesToReturn; + } catch (Http2Exception e) { + // If an exception happened during delivery, the listener may have returned part + // of the bytes before the error occurred. If that's the case, subtract that from + // the total processed bytes so that we don't return too many bytes. + int delta = unprocessedBytes - unprocessedBytes(stream); + bytesToReturn -= delta; + throw e; + } catch (RuntimeException e) { + // If an exception happened during delivery, the listener may have returned part + // of the bytes before the error occurred. If that's the case, subtract that from + // the total processed bytes so that we don't return too many bytes. + int delta = unprocessedBytes - unprocessedBytes(stream); + bytesToReturn -= delta; + throw e; } finally { // If appropriate, returned the processed bytes to the flow controller. - if (shouldApplyFlowControl && processedBytes > 0) { - stream.inboundFlow().returnProcessedBytes(ctx, processedBytes); + if (shouldApplyFlowControl && bytesToReturn > 0) { + stream.inboundFlow().returnProcessedBytes(ctx, bytesToReturn); } if (endOfStream) { diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java index d5fce6d9ea..792f376b9d 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java @@ -17,6 +17,7 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; +import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.flowControlError; import static io.netty.handler.codec.http2.Http2Exception.protocolError; import static io.netty.util.internal.ObjectUtil.checkNotNull; @@ -208,6 +209,11 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro updateWindowIfAppropriate(ctx); } + @Override + public int unProcessedBytes() { + return processedWindow - window; + } + /** * Updates the flow control window for this stream if it is appropriate. */ @@ -225,10 +231,15 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro /** * Returns the processed bytes for this stream. */ - void returnProcessedBytes(int delta) { + void returnProcessedBytes(int delta) throws Http2Exception { if (processedWindow - delta < window) { - throw new IllegalArgumentException( - "Attempting to return too many bytes for stream " + streamId); + if (streamId == CONNECTION_STREAM_ID) { + throw new Http2Exception(INTERNAL_ERROR, + "Attempting to return too many bytes for connection"); + } else { + throw new Http2StreamException(streamId, INTERNAL_ERROR, + "Attempting to return too many bytes for stream " + streamId); + } } processedWindow -= delta; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java index ad1a5e7c4a..5b9306724d 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java @@ -33,4 +33,9 @@ public interface Http2InboundFlowState extends Http2FlowState { * @param numBytes the number of bytes to be returned to the flow control window. */ void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception; + + /** + * The number of bytes that are outstanding and have not yet been returned to the flow controller. + */ + int unProcessedBytes(); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java index 44020f4ca3..365f4b68d4 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java @@ -23,7 +23,9 @@ import static io.netty.handler.codec.http2.Http2Exception.protocolError; import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; import static io.netty.util.CharsetUtil.UTF_8; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; @@ -45,6 +47,7 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; @@ -235,6 +238,50 @@ public class DefaultHttp2ConnectionDecoderTest { } } + @Test + public void errorDuringDeliveryShouldReturnCorrectNumberOfBytes() throws Exception { + final ByteBuf data = dummyData(); + final int padding = 10; + final AtomicInteger unprocessed = new AtomicInteger(data.readableBytes() + padding); + doAnswer(new Answer() { + @Override + public Integer answer(InvocationOnMock in) throws Throwable { + return unprocessed.get(); + } + }).when(inFlowState).unProcessedBytes(); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock in) throws Throwable { + int delta = (Integer) in.getArguments()[1]; + int newValue = unprocessed.addAndGet(-delta); + if (newValue < 0) { + throw new RuntimeException("Returned too many bytes"); + } + return null; + } + }).when(inFlowState).returnProcessedBytes(eq(ctx), anyInt()); + // When the listener callback is called, process a few bytes and then throw. + doAnswer(new Answer() { + @Override + public Integer answer(InvocationOnMock in) throws Throwable { + inFlowState.returnProcessedBytes(ctx, 4); + throw new RuntimeException("Fake Exception"); + } + }).when(listener).onDataRead(eq(ctx), eq(STREAM_ID), any(ByteBuf.class), eq(10), eq(true)); + try { + decode().onDataRead(ctx, STREAM_ID, data, padding, true); + fail("Expected exception"); + } catch (RuntimeException cause) { + verify(inboundFlow) + .applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true)); + verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future)); + verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true)); + assertEquals(0, inFlowState.unProcessedBytes()); + } finally { + data.release(); + } + } + @Test public void headersReadAfterGoAwayShouldBeIgnored() throws Exception { when(connection.goAwaySent()).thenReturn(true);