diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java index 50439c9fe7..78ef230c62 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java @@ -44,7 +44,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor private final Http2Connection connection; private final boolean strict; private boolean flowControllerInitialized; - final Http2Connection.PropertyKey propertyKey; + private final Http2Connection.PropertyKey propertyKey; public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener) { this(connection, listener, true); @@ -62,7 +62,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor public void onStreamRemoved(Http2Stream stream) { final Http2Decompressor decompressor = decompressor(stream); if (decompressor != null) { - cleanup(stream, decompressor); + cleanup(decompressor); } } }); @@ -80,7 +80,6 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor final EmbeddedChannel channel = decompressor.decompressor(); final int compressedBytes = data.readableBytes() + padding; - int processedBytes = 0; decompressor.incrementCompressedBytes(compressedBytes); try { // call retain here as it will call release after its written to the channel @@ -97,43 +96,44 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor // not be provided with data and thus could not return how many bytes were processed. We will assume // there is more data coming which will complete the decompression block. To allow for more data we // return all bytes to the flow control window (so the peer can send more data). - decompressor.incrementDecompressedByes(compressedBytes); - processedBytes = compressedBytes; - } else { - try { - decompressor.incrementDecompressedByes(padding); - for (;;) { - ByteBuf nextBuf = nextReadableBuf(channel); - boolean decompressedEndOfStream = nextBuf == null && endOfStream; - if (decompressedEndOfStream && channel.finish()) { - nextBuf = nextReadableBuf(channel); - decompressedEndOfStream = nextBuf == null; - } - - decompressor.incrementDecompressedByes(buf.readableBytes()); - processedBytes += listener.onDataRead(ctx, streamId, buf, padding, decompressedEndOfStream); - if (nextBuf == null) { - break; - } - - padding = 0; // Padding is only communicated once on the first iteration - buf.release(); - buf = nextBuf; - } - } finally { - buf.release(); - } + decompressor.incrementDecompressedBytes(compressedBytes); + return compressedBytes; + } + try { + Http2LocalFlowController flowController = connection.local().flowController(); + decompressor.incrementDecompressedBytes(padding); + for (;;) { + ByteBuf nextBuf = nextReadableBuf(channel); + boolean decompressedEndOfStream = nextBuf == null && endOfStream; + if (decompressedEndOfStream && channel.finish()) { + nextBuf = nextReadableBuf(channel); + decompressedEndOfStream = nextBuf == null; + } + + decompressor.incrementDecompressedBytes(buf.readableBytes()); + // Immediately return the bytes back to the flow controller. ConsumedBytesConverter will convert + // from the decompressed amount which the user knows about to the compressed amount which flow + // control knows about. + flowController.consumeBytes(stream, + listener.onDataRead(ctx, streamId, buf, padding, decompressedEndOfStream)); + if (nextBuf == null) { + break; + } + + padding = 0; // Padding is only communicated once on the first iteration. + buf.release(); + buf = nextBuf; + } + // We consume bytes each time we call the listener to ensure if multiple frames are decompressed + // that the bytes are accounted for immediately. Otherwise the user may see an inconsistent state of + // flow control. + return 0; + } finally { + buf.release(); } - decompressor.incrementProcessedBytes(processedBytes); - // The processed bytes will be translated to pre-decompressed byte amounts by DecompressorGarbageCollector - return processedBytes; } catch (Http2Exception e) { - // Consider all the bytes consumed because there was an error - decompressor.incrementProcessedBytes(compressedBytes); throw e; } catch (Throwable t) { - // Consider all the bytes consumed because there was an error - decompressor.incrementProcessedBytes(compressedBytes); throw streamError(stream.id(), INTERNAL_ERROR, t, "Decompressor error detected while delegating data read on streamId %d", stream.id()); } @@ -250,24 +250,12 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor } /** - * Release remaining content from the {@link EmbeddedChannel} and remove the decompressor - * from the {@link Http2Stream}. + * Release remaining content from the {@link EmbeddedChannel}. * - * @param stream The stream for which {@code decompressor} is the decompressor for * @param decompressor The decompressor for {@code stream} */ - private void cleanup(Http2Stream stream, Http2Decompressor decompressor) { - final EmbeddedChannel channel = decompressor.decompressor(); - if (channel.finish()) { - for (;;) { - final ByteBuf buf = channel.readInbound(); - if (buf == null) { - break; - } - buf.release(); - } - } - decompressor = stream.removeProperty(propertyKey); + private static void cleanup(Http2Decompressor decompressor) { + decompressor.decompressor().finishAndReleaseAll(); } /** @@ -340,26 +328,18 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor @Override public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception { Http2Decompressor decompressor = decompressor(stream); - Http2Decompressor copy = null; + if (decompressor != null) { + // Convert the decompressed bytes to compressed (on the wire) bytes. + numBytes = decompressor.consumeBytes(stream.id(), numBytes); + } try { - if (decompressor != null) { - // Make a copy before hand in case any exceptions occur we will roll back the state - copy = new Http2Decompressor(decompressor); - // Convert the uncompressed consumed bytes to compressed (on the wire) bytes. - numBytes = decompressor.consumeProcessedBytes(numBytes); - } return flowController.consumeBytes(stream, numBytes); } catch (Http2Exception e) { - if (copy != null) { - stream.setProperty(propertyKey, copy); - } throw e; } catch (Throwable t) { - if (copy != null) { - stream.setProperty(propertyKey, copy); - } - throw new Http2Exception(INTERNAL_ERROR, - "Error while returning bytes to flow control window", t); + // The stream should be closed at this point. We have already changed our state tracking the compressed + // bytes, and there is no guarantee we can recover if the underlying flow controller throws. + throw streamError(stream.id(), INTERNAL_ERROR, t, "Error while returning bytes to flow control window"); } } @@ -379,17 +359,9 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor */ private static final class Http2Decompressor { private final EmbeddedChannel decompressor; - private int processed; private int compressed; private int decompressed; - Http2Decompressor(Http2Decompressor rhs) { - this(rhs.decompressor); - processed = rhs.processed; - compressed = rhs.compressed; - decompressed = rhs.decompressed; - } - Http2Decompressor(EmbeddedChannel decompressor) { this.decompressor = decompressor; } @@ -401,53 +373,49 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor return decompressor; } - /** - * Increment the number of decompressed bytes processed by the application. - */ - void incrementProcessedBytes(int delta) { - if (processed + delta < 0) { - throw new IllegalArgumentException("processed bytes cannot be negative"); - } - processed += delta; - } - /** * Increment the number of bytes received prior to doing any decompression. */ void incrementCompressedBytes(int delta) { - if (compressed + delta < 0) { - throw new IllegalArgumentException("compressed bytes cannot be negative"); - } + assert delta >= 0; compressed += delta; } /** - * Increment the number of bytes after the decompression process. Under normal circumstances this - * delta should not exceed {@link Http2Decompressor#processed)}. + * Increment the number of bytes after the decompression process. */ - void incrementDecompressedByes(int delta) { - if (decompressed + delta < 0) { - throw new IllegalArgumentException("decompressed bytes cannot be negative"); - } + void incrementDecompressedBytes(int delta) { + assert delta >= 0; decompressed += delta; } /** - * Decrements {@link Http2Decompressor#processed} by {@code processedBytes} and determines the ratio - * between {@code processedBytes} and {@link Http2Decompressor#decompressed}. + * Determines the ratio between {@code numBytes} and {@link Http2Decompressor#decompressed}. * This ratio is used to decrement {@link Http2Decompressor#decompressed} and * {@link Http2Decompressor#compressed}. - * @param processedBytes The number of post-decompressed bytes that have been processed. + * @param streamId the stream ID + * @param decompressedBytes The number of post-decompressed bytes to return to flow control * @return The number of pre-decompressed bytes that have been consumed. */ - int consumeProcessedBytes(int processedBytes) { - // Consume the processed bytes first to verify that is is a valid amount - incrementProcessedBytes(-processedBytes); - - double consumedRatio = processedBytes / (double) decompressed; + int consumeBytes(int streamId, int decompressedBytes) throws Http2Exception { + if (decompressedBytes < 0) { + throw new IllegalArgumentException("decompressedBytes must not be negative: " + decompressedBytes); + } + if (decompressed - decompressedBytes < 0) { + throw streamError(streamId, INTERNAL_ERROR, + "Attempting to return too many bytes for stream %d. decompressed: %d " + + "decompressedBytes: %d", streamId, decompressed, decompressedBytes); + } + double consumedRatio = decompressedBytes / (double) decompressed; int consumedCompressed = Math.min(compressed, (int) Math.ceil(compressed * consumedRatio)); - incrementDecompressedByes(-Math.min(decompressed, (int) Math.ceil(decompressed * consumedRatio))); - incrementCompressedBytes(-consumedCompressed); + if (compressed - consumedCompressed < 0) { + throw streamError(streamId, INTERNAL_ERROR, + "overflow when converting decompressed bytes to compressed bytes for stream %d." + + "decompressedBytes: %d decompressed: %d compressed: %d consumedCompressed: %d", + streamId, decompressedBytes, decompressed, compressed, consumedCompressed); + } + decompressed -= decompressedBytes; + compressed -= consumedCompressed; return consumedCompressed; }