HTTP/2 DelegatingDecompressorFrameListener return bytes to flow control

Motivation:
If a single DATA frame ends up being decompressed into multiple frames by DelegatingDecompressorFrameListener the flow control accounting is delayed until all frames have been decompressed. However it is possible the user may want to return bytes to the flow controller which were not included in the onDataRead return value. In this case the amount of processed bytes has not been incremented and will lead to negative value for processed bytes.

Modifications:
- Http2Decompressor.incrementProcessedBytes should be called each time onDataRead is called to ensure all bytes are accounted for at the correct time

Result:
Fixes https://github.com/netty/netty/issues/5375
This commit is contained in:
Scott Mitchell 2016-06-08 16:28:17 -07:00
parent 9687d77b5a
commit 6aa5f76d42

View File

@ -44,7 +44,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
private final Http2Connection connection;
private final boolean strict;
private boolean flowControllerInitialized;
final Http2Connection.PropertyKey propertyKey;
private final Http2Connection.PropertyKey propertyKey;
public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener) {
this(connection, listener, true);
@ -62,7 +62,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
public void onStreamRemoved(Http2Stream stream) {
final Http2Decompressor decompressor = decompressor(stream);
if (decompressor != null) {
cleanup(stream, decompressor);
cleanup(decompressor);
}
}
});
@ -80,7 +80,6 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
final EmbeddedChannel channel = decompressor.decompressor();
final int compressedBytes = data.readableBytes() + padding;
int processedBytes = 0;
decompressor.incrementCompressedBytes(compressedBytes);
try {
// call retain here as it will call release after its written to the channel
@ -97,43 +96,44 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
// not be provided with data and thus could not return how many bytes were processed. We will assume
// there is more data coming which will complete the decompression block. To allow for more data we
// return all bytes to the flow control window (so the peer can send more data).
decompressor.incrementDecompressedByes(compressedBytes);
processedBytes = compressedBytes;
} else {
try {
decompressor.incrementDecompressedByes(padding);
for (;;) {
ByteBuf nextBuf = nextReadableBuf(channel);
boolean decompressedEndOfStream = nextBuf == null && endOfStream;
if (decompressedEndOfStream && channel.finish()) {
nextBuf = nextReadableBuf(channel);
decompressedEndOfStream = nextBuf == null;
}
decompressor.incrementDecompressedByes(buf.readableBytes());
processedBytes += listener.onDataRead(ctx, streamId, buf, padding, decompressedEndOfStream);
if (nextBuf == null) {
break;
}
padding = 0; // Padding is only communicated once on the first iteration
buf.release();
buf = nextBuf;
}
} finally {
buf.release();
}
decompressor.incrementDecompressedBytes(compressedBytes);
return compressedBytes;
}
try {
Http2LocalFlowController flowController = connection.local().flowController();
decompressor.incrementDecompressedBytes(padding);
for (;;) {
ByteBuf nextBuf = nextReadableBuf(channel);
boolean decompressedEndOfStream = nextBuf == null && endOfStream;
if (decompressedEndOfStream && channel.finish()) {
nextBuf = nextReadableBuf(channel);
decompressedEndOfStream = nextBuf == null;
}
decompressor.incrementDecompressedBytes(buf.readableBytes());
// Immediately return the bytes back to the flow controller. ConsumedBytesConverter will convert
// from the decompressed amount which the user knows about to the compressed amount which flow
// control knows about.
flowController.consumeBytes(stream,
listener.onDataRead(ctx, streamId, buf, padding, decompressedEndOfStream));
if (nextBuf == null) {
break;
}
padding = 0; // Padding is only communicated once on the first iteration.
buf.release();
buf = nextBuf;
}
// We consume bytes each time we call the listener to ensure if multiple frames are decompressed
// that the bytes are accounted for immediately. Otherwise the user may see an inconsistent state of
// flow control.
return 0;
} finally {
buf.release();
}
decompressor.incrementProcessedBytes(processedBytes);
// The processed bytes will be translated to pre-decompressed byte amounts by DecompressorGarbageCollector
return processedBytes;
} catch (Http2Exception e) {
// Consider all the bytes consumed because there was an error
decompressor.incrementProcessedBytes(compressedBytes);
throw e;
} catch (Throwable t) {
// Consider all the bytes consumed because there was an error
decompressor.incrementProcessedBytes(compressedBytes);
throw streamError(stream.id(), INTERNAL_ERROR, t,
"Decompressor error detected while delegating data read on streamId %d", stream.id());
}
@ -250,24 +250,12 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
}
/**
* Release remaining content from the {@link EmbeddedChannel} and remove the decompressor
* from the {@link Http2Stream}.
* Release remaining content from the {@link EmbeddedChannel}.
*
* @param stream The stream for which {@code decompressor} is the decompressor for
* @param decompressor The decompressor for {@code stream}
*/
private void cleanup(Http2Stream stream, Http2Decompressor decompressor) {
final EmbeddedChannel channel = decompressor.decompressor();
if (channel.finish()) {
for (;;) {
final ByteBuf buf = channel.readInbound();
if (buf == null) {
break;
}
buf.release();
}
}
decompressor = stream.removeProperty(propertyKey);
private static void cleanup(Http2Decompressor decompressor) {
decompressor.decompressor().finishAndReleaseAll();
}
/**
@ -340,26 +328,18 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
@Override
public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
Http2Decompressor decompressor = decompressor(stream);
Http2Decompressor copy = null;
if (decompressor != null) {
// Convert the decompressed bytes to compressed (on the wire) bytes.
numBytes = decompressor.consumeBytes(stream.id(), numBytes);
}
try {
if (decompressor != null) {
// Make a copy before hand in case any exceptions occur we will roll back the state
copy = new Http2Decompressor(decompressor);
// Convert the uncompressed consumed bytes to compressed (on the wire) bytes.
numBytes = decompressor.consumeProcessedBytes(numBytes);
}
return flowController.consumeBytes(stream, numBytes);
} catch (Http2Exception e) {
if (copy != null) {
stream.setProperty(propertyKey, copy);
}
throw e;
} catch (Throwable t) {
if (copy != null) {
stream.setProperty(propertyKey, copy);
}
throw new Http2Exception(INTERNAL_ERROR,
"Error while returning bytes to flow control window", t);
// The stream should be closed at this point. We have already changed our state tracking the compressed
// bytes, and there is no guarantee we can recover if the underlying flow controller throws.
throw streamError(stream.id(), INTERNAL_ERROR, t, "Error while returning bytes to flow control window");
}
}
@ -379,17 +359,9 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
*/
private static final class Http2Decompressor {
private final EmbeddedChannel decompressor;
private int processed;
private int compressed;
private int decompressed;
Http2Decompressor(Http2Decompressor rhs) {
this(rhs.decompressor);
processed = rhs.processed;
compressed = rhs.compressed;
decompressed = rhs.decompressed;
}
Http2Decompressor(EmbeddedChannel decompressor) {
this.decompressor = decompressor;
}
@ -401,53 +373,49 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
return decompressor;
}
/**
* Increment the number of decompressed bytes processed by the application.
*/
void incrementProcessedBytes(int delta) {
if (processed + delta < 0) {
throw new IllegalArgumentException("processed bytes cannot be negative");
}
processed += delta;
}
/**
* Increment the number of bytes received prior to doing any decompression.
*/
void incrementCompressedBytes(int delta) {
if (compressed + delta < 0) {
throw new IllegalArgumentException("compressed bytes cannot be negative");
}
assert delta >= 0;
compressed += delta;
}
/**
* Increment the number of bytes after the decompression process. Under normal circumstances this
* delta should not exceed {@link Http2Decompressor#processed)}.
* Increment the number of bytes after the decompression process.
*/
void incrementDecompressedByes(int delta) {
if (decompressed + delta < 0) {
throw new IllegalArgumentException("decompressed bytes cannot be negative");
}
void incrementDecompressedBytes(int delta) {
assert delta >= 0;
decompressed += delta;
}
/**
* Decrements {@link Http2Decompressor#processed} by {@code processedBytes} and determines the ratio
* between {@code processedBytes} and {@link Http2Decompressor#decompressed}.
* Determines the ratio between {@code numBytes} and {@link Http2Decompressor#decompressed}.
* This ratio is used to decrement {@link Http2Decompressor#decompressed} and
* {@link Http2Decompressor#compressed}.
* @param processedBytes The number of post-decompressed bytes that have been processed.
* @param streamId the stream ID
* @param decompressedBytes The number of post-decompressed bytes to return to flow control
* @return The number of pre-decompressed bytes that have been consumed.
*/
int consumeProcessedBytes(int processedBytes) {
// Consume the processed bytes first to verify that is is a valid amount
incrementProcessedBytes(-processedBytes);
double consumedRatio = processedBytes / (double) decompressed;
int consumeBytes(int streamId, int decompressedBytes) throws Http2Exception {
if (decompressedBytes < 0) {
throw new IllegalArgumentException("decompressedBytes must not be negative: " + decompressedBytes);
}
if (decompressed - decompressedBytes < 0) {
throw streamError(streamId, INTERNAL_ERROR,
"Attempting to return too many bytes for stream %d. decompressed: %d " +
"decompressedBytes: %d", streamId, decompressed, decompressedBytes);
}
double consumedRatio = decompressedBytes / (double) decompressed;
int consumedCompressed = Math.min(compressed, (int) Math.ceil(compressed * consumedRatio));
incrementDecompressedByes(-Math.min(decompressed, (int) Math.ceil(decompressed * consumedRatio)));
incrementCompressedBytes(-consumedCompressed);
if (compressed - consumedCompressed < 0) {
throw streamError(streamId, INTERNAL_ERROR,
"overflow when converting decompressed bytes to compressed bytes for stream %d." +
"decompressedBytes: %d decompressed: %d compressed: %d consumedCompressed: %d",
streamId, decompressedBytes, decompressed, compressed, consumedCompressed);
}
decompressed -= decompressedBytes;
compressed -= consumedCompressed;
return consumedCompressed;
}