From e7efe2b929b95d88fff7d413dba8a7c500a3db1d Mon Sep 17 00:00:00 2001 From: nmittler Date: Tue, 18 Nov 2014 11:16:26 -0800 Subject: [PATCH] Fixing HTTP/2 processed byte accounting during exception Motivation: Currently when an exception occurs during a listener.onDataRead callback, we return all bytes as processed. However, the listener may choose to return bytes via the InboundFlowState object rather than returning the integer. If the listener returns a few bytes and then throws, we will attempt to return too many bytes. Modifications: Added InboundFlowState.unProcessedBytes() to indicate how many unprocessed bytes are outstanding. Updated DefaultHttp2ConnectionDecoder to compare the unprocessed bytes before and after the listener.onDataRead callback when an exception was encountered. If there is a difference, it is subtracted off the total processed bytes to be returned to the flow controller. Result: HTTP/2 data frame delivery properly accounts for processed bytes through an exception. --- .../http2/DefaultHttp2ConnectionDecoder.java | 33 ++++++++++--- .../DefaultHttp2InboundFlowController.java | 17 +++++-- .../codec/http2/Http2InboundFlowState.java | 5 ++ .../DefaultHttp2ConnectionDecoderTest.java | 47 +++++++++++++++++++ 4 files changed, 93 insertions(+), 9 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index 73988cd2a8..0781ef3ea0 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -192,6 +192,10 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { frameReader.close(); } + private static int unprocessedBytes(Http2Stream stream) { + return stream.inboundFlow().unProcessedBytes(); + } + /** * Handles all inbound frames from the network. */ @@ -212,7 +216,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { // We should ignore this frame if RST_STREAM was sent or if GO_AWAY was sent with a // lower stream ID. boolean shouldApplyFlowControl = false; - int processedBytes = data.readableBytes() + padding; boolean shouldIgnore = shouldIgnoreFrame(stream); Http2Exception error = null; switch (stream.state()) { @@ -240,15 +243,19 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { break; } + int bytesToReturn = data.readableBytes() + padding; + int unprocessedBytes = unprocessedBytes(stream); try { // If we should apply flow control, do so now. if (shouldApplyFlowControl) { inboundFlow.applyFlowControl(ctx, streamId, data, padding, endOfStream); + // Update the unprocessed bytes after flow control is applied. + unprocessedBytes = unprocessedBytes(stream); } // If we should ignore this frame, do so now. if (shouldIgnore) { - return processedBytes; + return bytesToReturn; } // If the stream was in an invalid state to receive the frame, throw the error. @@ -258,12 +265,26 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { // Call back the application and retrieve the number of bytes that have been // immediately processed. - processedBytes = listener.onDataRead(ctx, streamId, data, padding, endOfStream); - return processedBytes; + bytesToReturn = listener.onDataRead(ctx, streamId, data, padding, endOfStream); + return bytesToReturn; + } catch (Http2Exception e) { + // If an exception happened during delivery, the listener may have returned part + // of the bytes before the error occurred. If that's the case, subtract that from + // the total processed bytes so that we don't return too many bytes. + int delta = unprocessedBytes - unprocessedBytes(stream); + bytesToReturn -= delta; + throw e; + } catch (RuntimeException e) { + // If an exception happened during delivery, the listener may have returned part + // of the bytes before the error occurred. If that's the case, subtract that from + // the total processed bytes so that we don't return too many bytes. + int delta = unprocessedBytes - unprocessedBytes(stream); + bytesToReturn -= delta; + throw e; } finally { // If appropriate, returned the processed bytes to the flow controller. - if (shouldApplyFlowControl && processedBytes > 0) { - stream.inboundFlow().returnProcessedBytes(ctx, processedBytes); + if (shouldApplyFlowControl && bytesToReturn > 0) { + stream.inboundFlow().returnProcessedBytes(ctx, bytesToReturn); } if (endOfStream) { diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java index d5fce6d9ea..792f376b9d 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java @@ -17,6 +17,7 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; +import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.flowControlError; import static io.netty.handler.codec.http2.Http2Exception.protocolError; import static io.netty.util.internal.ObjectUtil.checkNotNull; @@ -208,6 +209,11 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro updateWindowIfAppropriate(ctx); } + @Override + public int unProcessedBytes() { + return processedWindow - window; + } + /** * Updates the flow control window for this stream if it is appropriate. */ @@ -225,10 +231,15 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro /** * Returns the processed bytes for this stream. */ - void returnProcessedBytes(int delta) { + void returnProcessedBytes(int delta) throws Http2Exception { if (processedWindow - delta < window) { - throw new IllegalArgumentException( - "Attempting to return too many bytes for stream " + streamId); + if (streamId == CONNECTION_STREAM_ID) { + throw new Http2Exception(INTERNAL_ERROR, + "Attempting to return too many bytes for connection"); + } else { + throw new Http2StreamException(streamId, INTERNAL_ERROR, + "Attempting to return too many bytes for stream " + streamId); + } } processedWindow -= delta; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java index ad1a5e7c4a..5b9306724d 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java @@ -33,4 +33,9 @@ public interface Http2InboundFlowState extends Http2FlowState { * @param numBytes the number of bytes to be returned to the flow control window. */ void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception; + + /** + * The number of bytes that are outstanding and have not yet been returned to the flow controller. + */ + int unProcessedBytes(); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java index 44020f4ca3..365f4b68d4 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java @@ -23,7 +23,9 @@ import static io.netty.handler.codec.http2.Http2Exception.protocolError; import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; import static io.netty.util.CharsetUtil.UTF_8; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; @@ -45,6 +47,7 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; @@ -235,6 +238,50 @@ public class DefaultHttp2ConnectionDecoderTest { } } + @Test + public void errorDuringDeliveryShouldReturnCorrectNumberOfBytes() throws Exception { + final ByteBuf data = dummyData(); + final int padding = 10; + final AtomicInteger unprocessed = new AtomicInteger(data.readableBytes() + padding); + doAnswer(new Answer() { + @Override + public Integer answer(InvocationOnMock in) throws Throwable { + return unprocessed.get(); + } + }).when(inFlowState).unProcessedBytes(); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock in) throws Throwable { + int delta = (Integer) in.getArguments()[1]; + int newValue = unprocessed.addAndGet(-delta); + if (newValue < 0) { + throw new RuntimeException("Returned too many bytes"); + } + return null; + } + }).when(inFlowState).returnProcessedBytes(eq(ctx), anyInt()); + // When the listener callback is called, process a few bytes and then throw. + doAnswer(new Answer() { + @Override + public Integer answer(InvocationOnMock in) throws Throwable { + inFlowState.returnProcessedBytes(ctx, 4); + throw new RuntimeException("Fake Exception"); + } + }).when(listener).onDataRead(eq(ctx), eq(STREAM_ID), any(ByteBuf.class), eq(10), eq(true)); + try { + decode().onDataRead(ctx, STREAM_ID, data, padding, true); + fail("Expected exception"); + } catch (RuntimeException cause) { + verify(inboundFlow) + .applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true)); + verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future)); + verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true)); + assertEquals(0, inFlowState.unProcessedBytes()); + } finally { + data.release(); + } + } + @Test public void headersReadAfterGoAwayShouldBeIgnored() throws Exception { when(connection.goAwaySent()).thenReturn(true);