HTTP/2 Decompress Flow Control

Motivation:
The current decompression frame listener currently opts-out of application level flow control. The application should still be able to control flow control even if decompression is in use.

Modifications:
- DecompressorFrameListener will maintain how many compressed bytes, decompressed bytes, and processed by the listener bytes.  A ratio will be used to translate these values into application level flow control amount.

Result:
HTTP/2 decompressor delegates the application level flow control to the listener processing the decompressed data.
This commit is contained in:
Scott Mitchell 2014-11-14 20:37:14 -05:00
parent d2158370fa
commit b83f385017
14 changed files with 559 additions and 294 deletions

View File

@ -41,7 +41,7 @@ public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEnco
private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() { private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
@Override @Override
public void streamRemoved(Http2Stream stream) { public void streamRemoved(Http2Stream stream) {
final EmbeddedChannel compressor = stream.compressor(); final EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class);
if (compressor != null) { if (compressor != null) {
cleanup(stream, compressor); cleanup(stream, compressor);
} }
@ -103,19 +103,24 @@ public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEnco
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding, public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
final boolean endOfStream, ChannelPromise promise) { final boolean endOfStream, ChannelPromise promise) {
final Http2Stream stream = connection().stream(streamId); final Http2Stream stream = connection().stream(streamId);
final EmbeddedChannel compressor = stream == null ? null : stream.compressor(); final EmbeddedChannel channel = stream == null ? null :
if (compressor == null) { (EmbeddedChannel) stream.getProperty(CompressorHttp2ConnectionEncoder.class);
if (channel == null) {
// The compressor may be null if no compatible encoding type was found in this stream's headers // The compressor may be null if no compatible encoding type was found in this stream's headers
return super.writeData(ctx, streamId, data, padding, endOfStream, promise); return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
} }
try { try {
// call retain here as it will call release after its written to the channel // call retain here as it will call release after its written to the channel
compressor.writeOutbound(data.retain()); channel.writeOutbound(data.retain());
ByteBuf buf = nextReadableBuf(compressor); ByteBuf buf = nextReadableBuf(channel);
if (buf == null) { if (buf == null) {
if (endOfStream) { if (endOfStream) {
return super.writeData(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, endOfStream, promise); if (channel.finish()) {
buf = nextReadableBuf(channel);
}
return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
true, promise);
} }
// END_STREAM is not set and the assumption is data is still forthcoming. // END_STREAM is not set and the assumption is data is still forthcoming.
promise.setSuccess(); promise.setSuccess();
@ -123,23 +128,39 @@ public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEnco
} }
ChannelPromiseAggregator aggregator = new ChannelPromiseAggregator(promise); ChannelPromiseAggregator aggregator = new ChannelPromiseAggregator(promise);
ChannelPromise bufPromise = ctx.newPromise();
aggregator.add(bufPromise);
for (;;) { for (;;) {
final ByteBuf nextBuf = nextReadableBuf(compressor); ByteBuf nextBuf = nextReadableBuf(channel);
final boolean endOfStreamForBuf = nextBuf == null && endOfStream; boolean compressedEndOfStream = nextBuf == null && endOfStream;
ChannelPromise newPromise = ctx.newPromise(); if (compressedEndOfStream && channel.finish()) {
aggregator.add(newPromise); nextBuf = nextReadableBuf(channel);
compressedEndOfStream = nextBuf == null;
}
super.writeData(ctx, streamId, buf, padding, endOfStreamForBuf, newPromise); final ChannelPromise nextPromise;
if (nextBuf != null) {
// We have to add the nextPromise to the aggregator before doing the current write. This is so
// completing the current write before the next write is done won't complete the aggregate promise
nextPromise = ctx.newPromise();
aggregator.add(nextPromise);
} else {
nextPromise = null;
}
super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
if (nextBuf == null) { if (nextBuf == null) {
break; break;
} }
padding = 0; // Padding is only communicated once on the first iteration
buf = nextBuf; buf = nextBuf;
bufPromise = nextPromise;
} }
return promise; return promise;
} finally { } finally {
if (endOfStream) { if (endOfStream) {
cleanup(stream, compressor); cleanup(stream, channel);
} }
} }
} }
@ -215,7 +236,7 @@ public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEnco
return; return;
} }
EmbeddedChannel compressor = stream.compressor(); EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class);
if (compressor == null) { if (compressor == null) {
if (!endOfStream) { if (!endOfStream) {
AsciiString encoding = headers.get(CONTENT_ENCODING); AsciiString encoding = headers.get(CONTENT_ENCODING);
@ -225,6 +246,7 @@ public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEnco
try { try {
compressor = newContentCompressor(encoding); compressor = newContentCompressor(encoding);
if (compressor != null) { if (compressor != null) {
stream.setProperty(CompressorHttp2ConnectionEncoder.class, compressor);
AsciiString targetContentEncoding = getTargetContentEncoding(encoding); AsciiString targetContentEncoding = getTargetContentEncoding(encoding);
if (IDENTITY.equalsIgnoreCase(targetContentEncoding)) { if (IDENTITY.equalsIgnoreCase(targetContentEncoding)) {
headers.remove(CONTENT_ENCODING); headers.remove(CONTENT_ENCODING);
@ -261,10 +283,11 @@ public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEnco
if (buf == null) { if (buf == null) {
break; break;
} }
buf.release(); buf.release();
} }
} }
stream.compressor(null); stream.removeProperty(CompressorHttp2ConnectionEncoder.class);
} }
/** /**

View File

@ -29,7 +29,6 @@ import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action; import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.IntObjectMap;
@ -37,9 +36,11 @@ import io.netty.util.collection.IntObjectMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
/** /**
@ -217,14 +218,14 @@ public class DefaultHttp2Connection implements Http2Connection {
private boolean resetReceived; private boolean resetReceived;
private boolean endOfStreamSent; private boolean endOfStreamSent;
private boolean endOfStreamReceived; private boolean endOfStreamReceived;
private Http2InboundFlowState inboundFlow; private Http2FlowState inboundFlow;
private Http2FlowState outboundFlow; private Http2FlowState outboundFlow;
private EmbeddedChannel decompressor; private Http2FlowControlWindowManager garbageCollector;
private EmbeddedChannel compressor; private PropertyMap data;
private Object data;
DefaultStream(int id) { DefaultStream(int id) {
this.id = id; this.id = id;
data = new LazyPropertyMap(this);
} }
@Override @Override
@ -287,49 +288,27 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
@Override @Override
public void data(Object data) { public Object setProperty(Object key, Object value) {
this.data = data; return data.put(key, value);
}
@SuppressWarnings("unchecked")
@Override
public <T> T data() {
return (T) data;
} }
@Override @Override
public void decompressor(EmbeddedChannel decompressor) { public <V> V getProperty(Object key) {
if (this.decompressor != null && decompressor != null) { return data.get(key);
throw new IllegalStateException("decompressor can not be reassigned");
}
this.decompressor = decompressor;
} }
@Override @Override
public EmbeddedChannel decompressor() { public <V> V removeProperty(Object key) {
return decompressor; return data.remove(key);
} }
@Override @Override
public void compressor(EmbeddedChannel compressor) { public Http2FlowState inboundFlow() {
if (this.compressor != null && compressor != null) {
throw new IllegalStateException("compressor can not be reassigned");
}
this.compressor = compressor;
}
@Override
public EmbeddedChannel compressor() {
return compressor;
}
@Override
public Http2InboundFlowState inboundFlow() {
return inboundFlow; return inboundFlow;
} }
@Override @Override
public void inboundFlow(Http2InboundFlowState state) { public void inboundFlow(Http2FlowState state) {
inboundFlow = state; inboundFlow = state;
} }
@ -343,6 +322,16 @@ public class DefaultHttp2Connection implements Http2Connection {
outboundFlow = state; outboundFlow = state;
} }
@Override
public Http2FlowControlWindowManager garbageCollector() {
return garbageCollector;
}
@Override
public void garbageCollector(Http2FlowControlWindowManager collector) {
garbageCollector = collector;
}
@Override @Override
public final boolean isRoot() { public final boolean isRoot() {
return parent == null; return parent == null;
@ -597,6 +586,75 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
} }
/**
* Allows the data map to be lazily initialized for {@link DefaultStream}.
*/
private interface PropertyMap {
Object put(Object key, Object value);
<V> V get(Object key);
<V> V remove(Object key);
}
/**
* Provides actual {@link HashMap} functionality for {@link DefaultStream}'s application data.
*/
private static final class DefaultProperyMap implements PropertyMap {
private final Map<Object, Object> data;
DefaultProperyMap(int initialSize) {
data = new HashMap<Object, Object>(initialSize);
}
@Override
public Object put(Object key, Object value) {
return data.put(key, value);
}
@SuppressWarnings("unchecked")
@Override
public <V> V get(Object key) {
return (V) data.get(key);
}
@SuppressWarnings("unchecked")
@Override
public <V> V remove(Object key) {
return (V) data.remove(key);
}
}
/**
* Provides the lazy initialization for the {@link DefaultStream} data map.
*/
private static final class LazyPropertyMap implements PropertyMap {
private static final int DEFAULT_INITIAL_SIZE = 4;
private final DefaultStream stream;
LazyPropertyMap(DefaultStream stream) {
this.stream = stream;
}
@Override
public Object put(Object key, Object value) {
stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE);
return stream.data.put(key, value);
}
@Override
public <V> V get(Object key) {
stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE);
return stream.data.get(key);
}
@Override
public <V> V remove(Object key) {
stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE);
return stream.data.remove(key);
}
}
private static IntObjectMap<DefaultStream> newChildMap() { private static IntObjectMap<DefaultStream> newChildMap() {
return new IntObjectHashMap<DefaultStream>(4); return new IntObjectHashMap<DefaultStream>(4);
} }
@ -604,7 +662,7 @@ public class DefaultHttp2Connection implements Http2Connection {
/** /**
* Allows a correlation to be made between a stream and its old parent before a parent change occurs * Allows a correlation to be made between a stream and its old parent before a parent change occurs
*/ */
private final class ParentChangedEvent { private static final class ParentChangedEvent {
private final Http2Stream stream; private final Http2Stream stream;
private final Http2Stream oldParent; private final Http2Stream oldParent;

View File

@ -66,6 +66,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
return this; return this;
} }
@Override
public Http2LifecycleManager lifecycleManager() {
return lifecycleManager;
}
@Override @Override
public Builder inboundFlow(Http2InboundFlowController inboundFlow) { public Builder inboundFlow(Http2InboundFlowController inboundFlow) {
this.inboundFlow = inboundFlow; this.inboundFlow = inboundFlow;
@ -193,7 +198,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
} }
private static int unprocessedBytes(Http2Stream stream) { private static int unprocessedBytes(Http2Stream stream) {
return stream.inboundFlow().unProcessedBytes(); return stream.garbageCollector().unProcessedBytes();
} }
/** /**
@ -284,7 +289,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
} finally { } finally {
// If appropriate, returned the processed bytes to the flow controller. // If appropriate, returned the processed bytes to the flow controller.
if (shouldApplyFlowControl && bytesToReturn > 0) { if (shouldApplyFlowControl && bytesToReturn > 0) {
stream.inboundFlow().returnProcessedBytes(ctx, bytesToReturn); stream.garbageCollector().returnProcessedBytes(ctx, bytesToReturn);
} }
if (endOfStream) { if (endOfStream) {

View File

@ -60,6 +60,11 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
return this; return this;
} }
@Override
public Http2LifecycleManager lifecycleManager() {
return lifecycleManager;
}
@Override @Override
public Builder frameWriter( public Builder frameWriter(
Http2FrameWriter frameWriter) { Http2FrameWriter frameWriter) {

View File

@ -60,13 +60,18 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
this.windowUpdateRatio = windowUpdateRatio; this.windowUpdateRatio = windowUpdateRatio;
// Add a flow state for the connection. // Add a flow state for the connection.
connection.connectionStream().inboundFlow(new FlowState(CONNECTION_STREAM_ID)); final Http2Stream connectionStream = connection.connectionStream();
final FlowState connectionFlowState = new FlowState(connectionStream);
connectionStream.inboundFlow(connectionFlowState);
connectionStream.garbageCollector(connectionFlowState);
// Register for notification of new streams. // Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() { connection.addListener(new Http2ConnectionAdapter() {
@Override @Override
public void streamAdded(Http2Stream stream) { public void streamAdded(Http2Stream stream) {
stream.inboundFlow(new FlowState(stream.id())); final FlowState flowState = new FlowState(stream);
stream.inboundFlow(flowState);
stream.garbageCollector(flowState);
} }
}); });
} }
@ -141,8 +146,8 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
/** /**
* Flow control window state for an individual stream. * Flow control window state for an individual stream.
*/ */
private final class FlowState implements Http2InboundFlowState { private final class FlowState implements Http2FlowState, Http2FlowControlWindowManager {
private final int streamId; private final Http2Stream stream;
/** /**
* The actual flow control window that is decremented as soon as {@code DATA} arrives. * The actual flow control window that is decremented as soon as {@code DATA} arrives.
@ -160,8 +165,8 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
private int lowerBound; private int lowerBound;
private boolean endOfStream; private boolean endOfStream;
FlowState(int streamId) { FlowState(Http2Stream stream) {
this.streamId = streamId; this.stream = stream;
window = initialWindowSize; window = initialWindowSize;
processedWindow = window; processedWindow = window;
} }
@ -180,7 +185,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
*/ */
int initialWindowSize() { int initialWindowSize() {
int maxWindowSize = initialWindowSize; int maxWindowSize = initialWindowSize;
if (streamId == CONNECTION_STREAM_ID) { if (stream.id() == CONNECTION_STREAM_ID) {
// Determine the maximum number of streams that we can allow without integer overflow // Determine the maximum number of streams that we can allow without integer overflow
// of maxWindowSize * numStreams. Also take care to avoid division by zero when // of maxWindowSize * numStreams. Also take care to avoid division by zero when
// maxWindowSize == 0. // maxWindowSize == 0.
@ -196,7 +201,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
@Override @Override
public void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { public void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
if (streamId == CONNECTION_STREAM_ID) { if (stream.id() == CONNECTION_STREAM_ID) {
throw new UnsupportedOperationException("Returning bytes for the connection window is not supported"); throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
} }
checkNotNull(ctx, "ctx"); checkNotNull(ctx, "ctx");
@ -214,6 +219,11 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
return processedWindow - window; return processedWindow - window;
} }
@Override
public Http2Stream stream() {
return stream;
}
/** /**
* Updates the flow control window for this stream if it is appropriate. * Updates the flow control window for this stream if it is appropriate.
*/ */
@ -233,13 +243,12 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
*/ */
void returnProcessedBytes(int delta) throws Http2Exception { void returnProcessedBytes(int delta) throws Http2Exception {
if (processedWindow - delta < window) { if (processedWindow - delta < window) {
if (streamId == CONNECTION_STREAM_ID) { if (stream.id() == CONNECTION_STREAM_ID) {
throw new Http2Exception(INTERNAL_ERROR, throw new Http2Exception(INTERNAL_ERROR,
"Attempting to return too many bytes for connection"); "Attempting to return too many bytes for connection");
} else {
throw new Http2StreamException(streamId, INTERNAL_ERROR,
"Attempting to return too many bytes for stream " + streamId);
} }
throw new Http2StreamException(stream.id(), INTERNAL_ERROR,
"Attempting to return too many bytes for stream " + stream.id());
} }
processedWindow -= delta; processedWindow -= delta;
} }
@ -264,10 +273,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
// This difference is stored for the connection when writing the SETTINGS frame // This difference is stored for the connection when writing the SETTINGS frame
// and is cleared once we send a WINDOW_UPDATE frame. // and is cleared once we send a WINDOW_UPDATE frame.
if (delta < 0 && window < lowerBound) { if (delta < 0 && window < lowerBound) {
if (streamId == CONNECTION_STREAM_ID) { if (stream.id() == CONNECTION_STREAM_ID) {
throw protocolError("Connection flow control window exceeded"); throw protocolError("Connection flow control window exceeded");
} else { } else {
throw flowControlError("Flow control window exceeded for stream: %d", streamId); throw flowControlError("Flow control window exceeded for stream: %d", stream.id());
} }
} }
@ -286,7 +295,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
void updatedInitialWindowSize(int delta) throws Http2Exception { void updatedInitialWindowSize(int delta) throws Http2Exception {
if (delta > 0 && window > Integer.MAX_VALUE - delta) { if (delta > 0 && window > Integer.MAX_VALUE - delta) {
// Integer overflow. // Integer overflow.
throw flowControlError("Flow control window overflowed for stream: %d", streamId); throw flowControlError("Flow control window overflowed for stream: %d", stream.id());
} }
window += delta; window += delta;
processedWindow += delta; processedWindow += delta;
@ -314,7 +323,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
} }
// Send a window update for the stream/connection. // Send a window update for the stream/connection.
frameWriter.writeWindowUpdate(ctx, streamId, deltaWindowSize, ctx.newPromise()); frameWriter.writeWindowUpdate(ctx, stream.id(), deltaWindowSize, ctx.newPromise());
ctx.flush(); ctx.flush();
} }
} }

View File

@ -29,6 +29,7 @@ import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.compression.ZlibCodecFactory; import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper; import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.util.CharsetUtil;
/** /**
* A HTTP2 frame listener that will decompress data frames according to the {@code content-encoding} header for each * A HTTP2 frame listener that will decompress data frames according to the {@code content-encoding} header for each
@ -38,7 +39,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() { private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
@Override @Override
public void streamRemoved(Http2Stream stream) { public void streamRemoved(Http2Stream stream) {
final EmbeddedChannel decompressor = stream.decompressor(); final Http2Decompressor decompressor = stream.getProperty(Http2Decompressor.class);
if (decompressor != null) { if (decompressor != null) {
cleanup(stream, decompressor); cleanup(stream, decompressor);
} }
@ -65,45 +66,57 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception { throws Http2Exception {
final Http2Stream stream = connection.stream(streamId); final Http2Stream stream = connection.stream(streamId);
final EmbeddedChannel decompressor = stream == null ? null : stream.decompressor(); final Http2Decompressor decompressor = stream == null ? null :
(Http2Decompressor) stream.getProperty(Http2Decompressor.class);
if (decompressor == null) { if (decompressor == null) {
// The decompressor may be null if no compatible encoding type was found in this stream's headers // The decompressor may be null if no compatible encoding type was found in this stream's headers
return listener.onDataRead(ctx, streamId, data, padding, endOfStream); return listener.onDataRead(ctx, streamId, data, padding, endOfStream);
} }
// When decompressing, always opt-out of application-level flow control. final EmbeddedChannel channel = decompressor.decompressor();
// TODO: investigate how to apply application-level flow control when decompressing. final int compressedBytes = data.readableBytes() + padding;
int processedBytes = data.readableBytes() + padding; int processedBytes = 0;
try { decompressor.incrementCompressedBytes(compressedBytes);
// call retain here as it will call release after its written to the channel // call retain here as it will call release after its written to the channel
decompressor.writeInbound(data.retain()); channel.writeInbound(data.retain());
ByteBuf buf = nextReadableBuf(decompressor); ByteBuf buf = nextReadableBuf(channel);
if (buf == null && endOfStream && channel.finish()) {
buf = nextReadableBuf(channel);
}
if (buf == null) { if (buf == null) {
if (endOfStream) { if (endOfStream) {
listener.onDataRead(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, true); listener.onDataRead(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, true);
} }
// END_STREAM is not set and the data could not be decoded yet. // No new decompressed data was extracted from the compressed data. This means the application could not be
// The assumption has to be there will be more data frames to complete the decode. // provided with data and thus could not return how many bytes were processed. We will assume there is more
// We don't have enough information here to know if this is an error. // data coming which will complete the decompression block. To allow for more data we return all bytes to
// the flow control window (so the peer can send more data).
decompressor.incrementDecompressedByes(compressedBytes);
processedBytes = compressedBytes;
} else { } else {
decompressor.incrementDecompressedByes(padding);
for (;;) { for (;;) {
final ByteBuf nextBuf = nextReadableBuf(decompressor); ByteBuf nextBuf = nextReadableBuf(channel);
final boolean endOfStreamForBuf = nextBuf == null && endOfStream; boolean decompressedEndOfStream = nextBuf == null && endOfStream;
if (decompressedEndOfStream && channel.finish()) {
nextBuf = nextReadableBuf(channel);
decompressedEndOfStream = nextBuf == null;
}
listener.onDataRead(ctx, streamId, buf, padding, endOfStreamForBuf); decompressor.incrementDecompressedByes(buf.readableBytes());
processedBytes += listener.onDataRead(ctx, streamId, buf, padding, decompressedEndOfStream);
if (nextBuf == null) { if (nextBuf == null) {
break; break;
} }
padding = 0; // Padding is only communicated once on the first iteration
buf = nextBuf; buf = nextBuf;
} }
} }
decompressor.incrementProcessedBytes(processedBytes);
// The processed bytes will be translated to pre-decompressed byte amounts by DecompressorGarbageCollector
return processedBytes; return processedBytes;
} finally {
if (endOfStream) {
cleanup(stream, decompressor);
}
}
} }
@Override @Override
@ -172,17 +185,18 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
return; return;
} }
EmbeddedChannel decompressor = stream.decompressor(); Http2Decompressor decompressor = stream.getProperty(Http2Decompressor.class);
if (decompressor == null) { if (decompressor == null && !endOfStream) {
if (!endOfStream) {
// Determine the content encoding. // Determine the content encoding.
AsciiString contentEncoding = headers.get(CONTENT_ENCODING); AsciiString contentEncoding = headers.get(CONTENT_ENCODING);
if (contentEncoding == null) { if (contentEncoding == null) {
contentEncoding = IDENTITY; contentEncoding = IDENTITY;
} }
decompressor = newContentDecompressor(contentEncoding); final EmbeddedChannel channel = newContentDecompressor(contentEncoding);
if (decompressor != null) { if (channel != null) {
stream.decompressor(decompressor); decompressor = new Http2Decompressor(channel);
stream.setProperty(Http2Decompressor.class, decompressor);
stream.garbageCollector(new DecompressorGarbageCollector(stream.garbageCollector()));
// Decode the content and remove or replace the existing headers // Decode the content and remove or replace the existing headers
// so that the message looks like a decoded message. // so that the message looks like a decoded message.
AsciiString targetContentEncoding = getTargetContentEncoding(contentEncoding); AsciiString targetContentEncoding = getTargetContentEncoding(contentEncoding);
@ -193,9 +207,6 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
} }
} }
} }
} else if (endOfStream) {
cleanup(stream, decompressor);
}
if (decompressor != null) { if (decompressor != null) {
// The content length will be for the compressed data. Since we will decompress the data // The content length will be for the compressed data. Since we will decompress the data
@ -212,17 +223,22 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
* @param stream The stream for which {@code decompressor} is the decompressor for * @param stream The stream for which {@code decompressor} is the decompressor for
* @param decompressor The decompressor for {@code stream} * @param decompressor The decompressor for {@code stream}
*/ */
private static void cleanup(Http2Stream stream, EmbeddedChannel decompressor) { private static void cleanup(Http2Stream stream, Http2Decompressor decompressor) {
if (decompressor.finish()) { final EmbeddedChannel channel = decompressor.decompressor();
if (channel.finish()) {
for (;;) { for (;;) {
final ByteBuf buf = decompressor.readInbound(); final ByteBuf buf = channel.readInbound();
if (buf == null) { if (buf == null) {
break; break;
} }
buf.release(); buf.release();
} }
} }
stream.decompressor(null); decompressor = stream.removeProperty(Http2Decompressor.class);
if (decompressor != null) {
DecompressorGarbageCollector gc = (DecompressorGarbageCollector) stream.garbageCollector();
stream.garbageCollector(gc.original());
}
} }
/** /**
@ -245,4 +261,128 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
return buf; return buf;
} }
} }
/**
* Garbage collector which translates post-decompression amounts the application knows about
* to pre-decompression amounts that flow control knows about.
*/
private static final class DecompressorGarbageCollector implements Http2FlowControlWindowManager {
private final Http2FlowControlWindowManager original;
DecompressorGarbageCollector(Http2FlowControlWindowManager original) {
this.original = original;
}
@Override
public void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
final Http2Stream stream = stream();
final Http2Decompressor decompressor = stream.getProperty(Http2Decompressor.class);
// Make a copy before hand in case any exceptions occur we will roll back the state
Http2Decompressor copy = new Http2Decompressor(decompressor);
try {
original.returnProcessedBytes(ctx, decompressor.consumeProcessedBytes(numBytes));
} catch (Http2Exception e) {
stream.setProperty(Http2Decompressor.class, copy);
throw e;
} catch (Throwable t) {
stream.setProperty(Http2Decompressor.class, copy);
throw new Http2Exception(Http2Error.INTERNAL_ERROR,
"Error while returning bytes to flow control window", t);
}
}
Http2FlowControlWindowManager original() {
return original;
}
@Override
public int unProcessedBytes() {
return original.unProcessedBytes();
}
@Override
public Http2Stream stream() {
return original.stream();
}
}
/**
* Provides the state for stream {@code DATA} frame decompression.
*/
private static final class Http2Decompressor {
private final EmbeddedChannel decompressor;
private int processed;
private int compressed;
private int decompressed;
Http2Decompressor(Http2Decompressor rhs) {
this(rhs.decompressor);
processed = rhs.processed;
compressed = rhs.compressed;
decompressed = rhs.decompressed;
}
Http2Decompressor(EmbeddedChannel decompressor) {
this.decompressor = decompressor;
}
/**
* Responsible for taking compressed bytes in and producing decompressed bytes.
*/
EmbeddedChannel decompressor() {
return decompressor;
}
/**
* Increment the number of decompressed bytes processed by the application.
*/
void incrementProcessedBytes(int delta) {
if (processed + delta < 0) {
throw new IllegalArgumentException("processed bytes cannot be negative");
}
processed += delta;
}
/**
* Increment the number of bytes received prior to doing any decompression.
*/
void incrementCompressedBytes(int delta) {
if (compressed + delta < 0) {
throw new IllegalArgumentException("compressed bytes cannot be negative");
}
compressed += delta;
}
/**
* Increment the number of bytes after the decompression process. Under normal circumstances this
* delta should not exceed {@link Http2Decompressor#processedBytes()}.
*/
void incrementDecompressedByes(int delta) {
if (decompressed + delta < 0) {
throw new IllegalArgumentException("decompressed bytes cannot be negative");
}
decompressed += delta;
}
/**
* Decrements {@link Http2Decompressor#processedBytes()} by {@code processedBytes} and determines the ratio
* between {@code processedBytes} and {@link Http2Decompressor#decompressedBytes()}.
* This ratio is used to decrement {@link Http2Decompressor#decompressedBytes()} and
* {@link Http2Decompressor#compressedBytes()}.
* @param processedBytes The number of post-decompressed bytes that have been processed.
* @return The number of pre-decompressed bytes that have been consumed.
*/
int consumeProcessedBytes(int processedBytes) {
// Consume the processed bytes first to verify that is is a valid amount
incrementProcessedBytes(-processedBytes);
double consumedRatio = processedBytes / (double) decompressed;
int consumedCompressed = Math.min(compressed, (int) Math.ceil(compressed * consumedRatio));
incrementDecompressedByes(-Math.min(decompressed, (int) Math.ceil(decompressed * consumedRatio)));
incrementCompressedBytes(-consumedCompressed);
return consumedCompressed;
}
}
} }

View File

@ -43,6 +43,11 @@ public interface Http2ConnectionDecoder extends Closeable {
*/ */
Builder lifecycleManager(Http2LifecycleManager lifecycleManager); Builder lifecycleManager(Http2LifecycleManager lifecycleManager);
/**
* Gets the {@link Http2LifecycleManager} to be used when building the decoder.
*/
Http2LifecycleManager lifecycleManager();
/** /**
* Sets the {@link Http2InboundFlowController} to be used when building the decoder. * Sets the {@link Http2InboundFlowController} to be used when building the decoder.
*/ */

View File

@ -40,6 +40,11 @@ public interface Http2ConnectionEncoder extends Http2FrameWriter, Http2OutboundF
*/ */
Builder lifecycleManager(Http2LifecycleManager lifecycleManager); Builder lifecycleManager(Http2LifecycleManager lifecycleManager);
/**
* Gets the {@link Http2LifecycleManager} to be used when building the encoder.
*/
Http2LifecycleManager lifecycleManager();
/** /**
* Sets the {@link Http2FrameWriter} to be used when building the encoder. * Sets the {@link Http2FrameWriter} to be used when building the encoder.
*/ */

View File

@ -79,13 +79,18 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
checkNotNull(decoderBuilder, "decoderBuilder"); checkNotNull(decoderBuilder, "decoderBuilder");
checkNotNull(encoderBuilder, "encoderBuilder"); checkNotNull(encoderBuilder, "encoderBuilder");
// Build the encoder. if (encoderBuilder.lifecycleManager() != decoderBuilder.lifecycleManager()) {
throw new IllegalArgumentException("Encoder and Decoder must share a lifecycle manager");
} else if (encoderBuilder.lifecycleManager() == null) {
encoderBuilder.lifecycleManager(this); encoderBuilder.lifecycleManager(this);
decoderBuilder.lifecycleManager(this);
}
// Build the encoder.
encoder = checkNotNull(encoderBuilder.build(), "encoder"); encoder = checkNotNull(encoderBuilder.build(), "encoder");
// Build the decoder. // Build the decoder.
decoderBuilder.encoder(encoder); decoderBuilder.encoder(encoder);
decoderBuilder.lifecycleManager(this);
decoder = checkNotNull(decoderBuilder.build(), "decoder"); decoder = checkNotNull(decoderBuilder.build(), "decoder");
// Verify that the encoder and decoder use the same connection. // Verify that the encoder and decoder use the same connection.
@ -146,8 +151,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
// The channel just became active - send the connection preface to the remote // The channel just became active - send the connection preface to the remote endpoint.
// endpoint.
sendPreface(ctx); sendPreface(ctx);
super.channelActive(ctx); super.channelActive(ctx);
} }

View File

@ -17,11 +17,9 @@ package io.netty.handler.codec.http2;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
/** /**
* The inbound flow control state for a stream. This object is created and managed by the * Allows data to be returned to the flow control window.
* {@link Http2InboundFlowController}.
*/ */
public interface Http2InboundFlowState extends Http2FlowState { public interface Http2FlowControlWindowManager {
/** /**
* Used by applications that participate in application-level inbound flow control. Allows the * Used by applications that participate in application-level inbound flow control. Allows the
* application to return a number of bytes that has been processed and thereby enabling the * application to return a number of bytes that has been processed and thereby enabling the
@ -38,4 +36,9 @@ public interface Http2InboundFlowState extends Http2FlowState {
* The number of bytes that are outstanding and have not yet been returned to the flow controller. * The number of bytes that are outstanding and have not yet been returned to the flow controller.
*/ */
int unProcessedBytes(); int unProcessedBytes();
/**
* Get the stream that is being managed
*/
Http2Stream stream();
} }

View File

@ -15,8 +15,6 @@
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import io.netty.channel.embedded.EmbeddedChannel;
import java.util.Collection; import java.util.Collection;
/** /**
@ -135,43 +133,29 @@ public interface Http2Stream {
/** /**
* Associates the application-defined data with this stream. * Associates the application-defined data with this stream.
* @return The value that was previously associated with {@code key}, or {@code null} if there was none.
*/ */
void data(Object data); Object setProperty(Object key, Object value);
/** /**
* Returns application-defined data if any was associated with this stream. * Returns application-defined data if any was associated with this stream.
*/ */
<T> T data(); <V> V getProperty(Object key);
/** /**
* Associate an object responsible for decompressing data frames for this stream * Returns and removes application-defined data if any was associated with this stream.
*/ */
void decompressor(EmbeddedChannel decompressor); <V> V removeProperty(Object key);
/**
* Get the object capable of decompressing data frames for this stream
*/
EmbeddedChannel decompressor();
/**
* Associate an object responsible for compressing data frames for this stream
*/
void compressor(EmbeddedChannel decompressor);
/**
* Get the object capable of compressing data frames for this stream
*/
EmbeddedChannel compressor();
/** /**
* Gets the in-bound flow control state for this stream. * Gets the in-bound flow control state for this stream.
*/ */
Http2InboundFlowState inboundFlow(); Http2FlowState inboundFlow();
/** /**
* Sets the in-bound flow control state for this stream. * Sets the in-bound flow control state for this stream.
*/ */
void inboundFlow(Http2InboundFlowState state); void inboundFlow(Http2FlowState state);
/** /**
* Gets the out-bound flow control window for this stream. * Gets the out-bound flow control window for this stream.
@ -183,6 +167,16 @@ public interface Http2Stream {
*/ */
void outboundFlow(Http2FlowState state); void outboundFlow(Http2FlowState state);
/**
* Gets the interface which allows bytes to be returned to the flow controller
*/
Http2FlowControlWindowManager garbageCollector();
/**
* Sets the interface which allows bytes to be returned to the flow controller
*/
void garbageCollector(Http2FlowControlWindowManager collector);
/** /**
* Updates an priority for this stream. Calling this method may affect the straucture of the * Updates an priority for this stream. Calling this method may affect the straucture of the
* priority tree. * priority tree.

View File

@ -19,47 +19,50 @@ import static io.netty.handler.codec.http2.Http2TestUtil.as;
import static io.netty.handler.codec.http2.Http2TestUtil.runInChannel; import static io.netty.handler.codec.http2.Http2TestUtil.runInChannel;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.AsciiString; import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http2.Http2TestUtil.FrameAdapter; import io.netty.handler.codec.http2.Http2TestUtil.FrameAdapter;
import io.netty.handler.codec.http2.Http2TestUtil.FrameCountDown;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable; import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil; import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.List; import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/** /**
* Test for data decompression in the HTTP/2 codec. * Test for data decompression in the HTTP/2 codec.
@ -68,18 +71,12 @@ public class DataCompressionHttp2Test {
private static final AsciiString GET = as("GET"); private static final AsciiString GET = as("GET");
private static final AsciiString POST = as("POST"); private static final AsciiString POST = as("POST");
private static final AsciiString PATH = as("/some/path"); private static final AsciiString PATH = as("/some/path");
private List<ByteBuf> dataCapture;
@Mock @Mock
private Http2FrameListener serverListener; private Http2FrameListener serverListener;
@Mock @Mock
private Http2FrameListener clientListener; private Http2FrameListener clientListener;
@Mock
private Http2LifecycleManager serverLifeCycleManager;
@Mock
private Http2LifecycleManager clientLifeCycleManager;
private ByteBufAllocator alloc;
private Http2ConnectionEncoder serverEncoder; private Http2ConnectionEncoder serverEncoder;
private Http2ConnectionEncoder clientEncoder; private Http2ConnectionEncoder clientEncoder;
private ServerBootstrap sb; private ServerBootstrap sb;
@ -89,23 +86,22 @@ public class DataCompressionHttp2Test {
private Channel clientChannel; private Channel clientChannel;
private volatile CountDownLatch serverLatch; private volatile CountDownLatch serverLatch;
private volatile CountDownLatch clientLatch; private volatile CountDownLatch clientLatch;
private FrameAdapter serverAdapter;
private FrameAdapter clientAdapter;
private Http2Connection serverConnection; private Http2Connection serverConnection;
private Http2Connection clientConnection;
private ByteArrayOutputStream serverOut;
@Before @Before
public void setup() throws InterruptedException { public void setup() throws InterruptedException {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
} }
@After
public void cleaup() throws IOException {
serverOut.close();
}
@After @After
public void teardown() throws InterruptedException { public void teardown() throws InterruptedException {
if (dataCapture != null) {
for (int i = 0; i < dataCapture.size(); ++i) {
dataCapture.get(i).release();
}
dataCapture = null;
}
serverChannel.close().sync(); serverChannel.close().sync();
Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS); Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS); Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS);
@ -113,19 +109,18 @@ public class DataCompressionHttp2Test {
serverGroup.sync(); serverGroup.sync();
serverChildGroup.sync(); serverChildGroup.sync();
clientGroup.sync(); clientGroup.sync();
serverAdapter = null;
clientAdapter = null;
serverConnection = null;
} }
@Test @Test
public void justHeadersNoData() throws Exception { public void justHeadersNoData() throws Exception {
bootstrapEnv(1, 1); bootstrapEnv(1, 0, 1);
final Http2Headers headers = new DefaultHttp2Headers().method(GET).path(PATH) final Http2Headers headers = new DefaultHttp2Headers().method(GET).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
// Required because the decompressor intercepts the onXXXRead events before // Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does. // our {@link Http2TestUtil$FrameAdapter} does.
FrameAdapter.getOrCreateStream(serverConnection, 3, false); FrameAdapter.getOrCreateStream(serverConnection, 3, false);
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() { public void run() {
@ -140,131 +135,109 @@ public class DataCompressionHttp2Test {
@Test @Test
public void gzipEncodingSingleEmptyMessage() throws Exception { public void gzipEncodingSingleEmptyMessage() throws Exception {
bootstrapEnv(2, 1);
final String text = ""; final String text = "";
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes()); final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); bootstrapEnv(1, data.readableBytes(), 1);
try { try {
final ByteBuf encodedData = encodeData(data, encoder);
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
// Required because the decompressor intercepts the onXXXRead events before // Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does. // our {@link Http2TestUtil$FrameAdapter} does.
FrameAdapter.getOrCreateStream(serverConnection, 3, false); Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false);
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() { public void run() {
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient()); clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient()); clientEncoder.writeData(ctxClient(), 3, data, 0, true, newPromiseClient());
ctxClient().flush(); ctxClient().flush();
} }
}); });
awaitServer(); awaitServer();
data.resetReaderIndex(); assertEquals(0, stream.garbageCollector().unProcessedBytes());
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class); assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name()));
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
eq(true));
dataCapture = dataCaptor.getAllValues();
assertEquals(data, dataCapture.get(0));
} finally { } finally {
data.release(); data.release();
cleanupEncoder(encoder);
} }
} }
@Test @Test
public void gzipEncodingSingleMessage() throws Exception { public void gzipEncodingSingleMessage() throws Exception {
bootstrapEnv(2, 1);
final String text = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc"; final String text = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc";
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes()); final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); bootstrapEnv(1, data.readableBytes(), 1);
try { try {
final ByteBuf encodedData = encodeData(data, encoder);
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
// Required because the decompressor intercepts the onXXXRead events before // Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does. // our {@link Http2TestUtil$FrameAdapter} does.
FrameAdapter.getOrCreateStream(serverConnection, 3, false); Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false);
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() { public void run() {
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient()); clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient()); clientEncoder.writeData(ctxClient(), 3, data, 0, true, newPromiseClient());
ctxClient().flush(); ctxClient().flush();
} }
}); });
awaitServer(); awaitServer();
data.resetReaderIndex(); assertEquals(0, stream.garbageCollector().unProcessedBytes());
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class); assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name()));
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
eq(true));
dataCapture = dataCaptor.getAllValues();
assertEquals(data, dataCapture.get(0));
} finally { } finally {
data.release(); data.release();
cleanupEncoder(encoder);
} }
} }
@Test @Test
public void gzipEncodingMultipleMessages() throws Exception { public void gzipEncodingMultipleMessages() throws Exception {
bootstrapEnv(3, 1);
final String text1 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc"; final String text1 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc";
final String text2 = "dddddddddddddddddddeeeeeeeeeeeeeeeeeeeffffffffffffffffffff"; final String text2 = "dddddddddddddddddddeeeeeeeeeeeeeeeeeeeffffffffffffffffffff";
final ByteBuf data1 = Unpooled.copiedBuffer(text1.getBytes()); final ByteBuf data1 = Unpooled.copiedBuffer(text1.getBytes());
final ByteBuf data2 = Unpooled.copiedBuffer(text2.getBytes()); final ByteBuf data2 = Unpooled.copiedBuffer(text2.getBytes());
final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); bootstrapEnv(1, data1.readableBytes() + data2.readableBytes(), 1);
try { try {
final ByteBuf encodedData1 = encodeData(data1, encoder);
final ByteBuf encodedData2 = encodeData(data2, encoder);
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
// Required because the decompressor intercepts the onXXXRead events before // Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does. // our {@link Http2TestUtil$FrameAdapter} does.
FrameAdapter.getOrCreateStream(serverConnection, 3, false); Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false);
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() { public void run() {
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient()); clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, encodedData1, 0, false, newPromiseClient()); clientEncoder.writeData(ctxClient(), 3, data1, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, encodedData2, 0, true, newPromiseClient()); clientEncoder.writeData(ctxClient(), 3, data2, 0, true, newPromiseClient());
ctxClient().flush(); ctxClient().flush();
} }
}); });
awaitServer(); awaitServer();
data1.resetReaderIndex(); assertEquals(0, stream.garbageCollector().unProcessedBytes());
data2.resetReaderIndex(); assertEquals(new StringBuilder(text1).append(text2).toString(),
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class); serverOut.toString(CharsetUtil.UTF_8.name()));
ArgumentCaptor<Boolean> endStreamCaptor = ArgumentCaptor.forClass(Boolean.class);
verify(serverListener, times(2)).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(),
eq(0), endStreamCaptor.capture());
dataCapture = dataCaptor.getAllValues();
assertEquals(data1, dataCapture.get(0));
assertEquals(data2, dataCapture.get(1));
List<Boolean> endStreamCapture = endStreamCaptor.getAllValues();
assertFalse(endStreamCapture.get(0));
assertTrue(endStreamCapture.get(1));
} finally { } finally {
data1.release(); data1.release();
data2.release(); data2.release();
cleanupEncoder(encoder);
} }
} }
@Test @Test
public void deflateEncodingSingleLargeMessageReducedWindow() throws Exception { public void deflateEncodingSingleLargeMessageReducedWindow() throws Exception {
bootstrapEnv(3, 1);
final int BUFFER_SIZE = 1 << 16; final int BUFFER_SIZE = 1 << 16;
bootstrapEnv(1, BUFFER_SIZE, 1);
final ByteBuf data = Unpooled.buffer(BUFFER_SIZE); final ByteBuf data = Unpooled.buffer(BUFFER_SIZE);
final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.ZLIB));
try { try {
for (int i = 0; i < data.capacity(); ++i) { for (int i = 0; i < data.capacity(); ++i) {
data.writeByte((byte) 'a'); data.writeByte((byte) 'a');
} }
final ByteBuf encodedData = encodeData(data, encoder);
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE); .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE);
final Http2Settings settings = new Http2Settings(); final Http2Settings settings = new Http2Settings();
// Assume the compression operation will reduce the size by at least 10 bytes // Assume the compression operation will reduce the size by at least 10 bytes
settings.initialWindowSize(BUFFER_SIZE - 10); settings.initialWindowSize(BUFFER_SIZE - 10);
runInChannel(serverConnectedChannel, new Http2Runnable() { runInChannel(serverConnectedChannel, new Http2Runnable() {
@ -278,86 +251,123 @@ public class DataCompressionHttp2Test {
// Required because the decompressor intercepts the onXXXRead events before // Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does. // our {@link Http2TestUtil$FrameAdapter} does.
FrameAdapter.getOrCreateStream(serverConnection, 3, false); Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false);
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() { public void run() {
clientEncoder.writeSettings(ctxClient(), settings, newPromiseClient()); clientEncoder.writeSettings(ctxClient(), settings, newPromiseClient());
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient()); clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient()); clientEncoder.writeData(ctxClient(), 3, data, 0, true, newPromiseClient());
ctxClient().flush(); ctxClient().flush();
} }
}); });
awaitServer(); awaitServer();
data.resetReaderIndex(); assertEquals(0, stream.garbageCollector().unProcessedBytes());
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class); assertEquals(data.resetReaderIndex().toString(CharsetUtil.UTF_8),
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0), serverOut.toString(CharsetUtil.UTF_8.name()));
eq(true));
dataCapture = dataCaptor.getAllValues();
assertEquals(data, dataCapture.get(0));
} finally { } finally {
data.release(); data.release();
cleanupEncoder(encoder);
} }
} }
private ByteBuf encodeData(ByteBuf data, EmbeddedChannel encoder) { @Test
ByteBuf encoded = alloc.buffer(data.readableBytes()); public void deflateEncodingMultipleWriteLargeMessageReducedWindow() throws Exception {
encoder.writeOutbound(data.retain()); final int BUFFER_SIZE = 1 << 12;
for (;;) { final byte[] bytes = new byte[BUFFER_SIZE];
final ByteBuf buf = encoder.readOutbound(); new Random().nextBytes(bytes);
if (buf == null) { bootstrapEnv(1, BUFFER_SIZE, 1);
break; final ByteBuf data = Unpooled.wrappedBuffer(bytes);
} try {
if (!buf.isReadable()) { final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
buf.release(); .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE);
continue; final Http2Settings settings = new Http2Settings();
}
encoded.writeBytes(buf);
buf.release();
}
return encoded;
}
private static void cleanupEncoder(EmbeddedChannel encoder) { settings.initialWindowSize(BUFFER_SIZE / 2);
if (encoder.finish()) { runInChannel(serverConnectedChannel, new Http2Runnable() {
for (;;) { @Override
final ByteBuf buf = encoder.readOutbound(); public void run() {
if (buf == null) { serverEncoder.writeSettings(ctxServer(), settings, newPromiseServer());
break; ctxServer().flush();
} }
buf.release(); });
awaitClient();
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false);
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
clientEncoder.writeSettings(ctxClient(), settings, newPromiseClient());
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, data, 0, true, newPromiseClient());
ctxClient().flush();
} }
});
awaitServer();
assertEquals(0, stream.garbageCollector().unProcessedBytes());
assertEquals(data.resetReaderIndex().toString(CharsetUtil.UTF_8),
serverOut.toString(CharsetUtil.UTF_8.name()));
} finally {
data.release();
} }
} }
private void bootstrapEnv(int serverCountDown, int clientCountDown) throws Exception { private void bootstrapEnv(int serverHalfClosedCount, int serverOutSize, int clientCount) throws Exception {
alloc = UnpooledByteBufAllocator.DEFAULT; serverOut = new ByteArrayOutputStream(serverOutSize);
serverLatch = new CountDownLatch(serverHalfClosedCount);
clientLatch = new CountDownLatch(clientCount);
sb = new ServerBootstrap(); sb = new ServerBootstrap();
cb = new Bootstrap(); cb = new Bootstrap();
serverLatch = new CountDownLatch(serverCountDown); // Streams are created before the normal flow for this test, so these connection must be initialized up front.
clientLatch = new CountDownLatch(clientCountDown);
serverConnection = new DefaultHttp2Connection(true); serverConnection = new DefaultHttp2Connection(true);
clientConnection = new DefaultHttp2Connection(false);
final CountDownLatch latch = new CountDownLatch(1); serverConnection.addListener(new Http2ConnectionAdapter() {
@Override
public void streamHalfClosed(Http2Stream stream) {
serverLatch.countDown();
}
});
doAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock in) throws Throwable {
ByteBuf buf = (ByteBuf) in.getArguments()[2];
int padding = (Integer) in.getArguments()[3];
int processedBytes = buf.readableBytes() + padding;
buf.readBytes(serverOut, buf.readableBytes());
buf.release();
return processedBytes;
}
}).when(serverListener).onDataRead(any(ChannelHandlerContext.class), anyInt(),
any(ByteBuf.class), anyInt(), anyBoolean());
final CountDownLatch serverChannelLatch = new CountDownLatch(1);
sb.group(new NioEventLoopGroup(), new NioEventLoopGroup()); sb.group(new NioEventLoopGroup(), new NioEventLoopGroup());
sb.channel(NioServerSocketChannel.class); sb.channel(NioServerSocketChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline(); ChannelPipeline p = ch.pipeline();
CompressorHttp2ConnectionEncoder.Builder builder = new CompressorHttp2ConnectionEncoder.Builder();
Http2FrameWriter writer = new DefaultHttp2FrameWriter(); Http2FrameWriter writer = new DefaultHttp2FrameWriter();
serverEncoder = builder.connection(serverConnection).frameWriter(writer) Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler(
.outboundFlow(new DefaultHttp2OutboundFlowController(serverConnection, writer)) new DefaultHttp2ConnectionDecoder.Builder()
.lifecycleManager(serverLifeCycleManager).build(); .connection(serverConnection)
serverAdapter = new FrameAdapter(serverConnection, new DelegatingDecompressorFrameListener( .frameReader(new DefaultHttp2FrameReader())
serverConnection, serverListener), serverLatch); .inboundFlow(new DefaultHttp2InboundFlowController(serverConnection, writer))
p.addLast("reader", serverAdapter); .listener(new DelegatingDecompressorFrameListener(serverConnection, serverListener)),
p.addLast(Http2CodecUtil.ignoreSettingsHandler()); new CompressorHttp2ConnectionEncoder.Builder().connection(serverConnection).frameWriter(writer)
.outboundFlow(new DefaultHttp2OutboundFlowController(serverConnection, writer)));
serverEncoder = connectionHandler.encoder();
serverConnectedChannel = ch; serverConnectedChannel = ch;
latch.countDown(); p.addLast(connectionHandler);
p.addLast(Http2CodecUtil.ignoreSettingsHandler());
serverChannelLatch.countDown();
} }
}); });
@ -367,14 +377,18 @@ public class DataCompressionHttp2Test {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline(); ChannelPipeline p = ch.pipeline();
Http2Connection connection = new DefaultHttp2Connection(false); FrameCountDown clientFrameCountDown = new FrameCountDown(clientListener, clientLatch);
Http2FrameWriter writer = new DefaultHttp2FrameWriter(); Http2FrameWriter writer = new DefaultHttp2FrameWriter();
CompressorHttp2ConnectionEncoder.Builder builder = new CompressorHttp2ConnectionEncoder.Builder(); Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler(
clientEncoder = builder.connection(connection).frameWriter(writer) new DefaultHttp2ConnectionDecoder.Builder()
.outboundFlow(new DefaultHttp2OutboundFlowController(connection, writer)) .connection(clientConnection)
.lifecycleManager(clientLifeCycleManager).build(); .frameReader(new DefaultHttp2FrameReader())
clientAdapter = new FrameAdapter(connection, clientListener, clientLatch); .inboundFlow(new DefaultHttp2InboundFlowController(clientConnection, writer))
p.addLast("reader", clientAdapter); .listener(new DelegatingDecompressorFrameListener(clientConnection, clientFrameCountDown)),
new CompressorHttp2ConnectionEncoder.Builder().connection(clientConnection).frameWriter(writer)
.outboundFlow(new DefaultHttp2OutboundFlowController(clientConnection, writer)));
clientEncoder = connectionHandler.encoder();
p.addLast(connectionHandler);
p.addLast(Http2CodecUtil.ignoreSettingsHandler()); p.addLast(Http2CodecUtil.ignoreSettingsHandler());
} }
}); });
@ -385,16 +399,16 @@ public class DataCompressionHttp2Test {
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port)); ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port));
assertTrue(ccf.awaitUninterruptibly().isSuccess()); assertTrue(ccf.awaitUninterruptibly().isSuccess());
clientChannel = ccf.channel(); clientChannel = ccf.channel();
assertTrue(serverChannelLatch.await(5, SECONDS));
assertTrue(latch.await(5, SECONDS));
} }
private void awaitClient() throws Exception { private void awaitClient() throws Exception {
clientLatch.await(5, SECONDS); assertTrue(clientLatch.await(5, SECONDS));
} }
private void awaitServer() throws Exception { private void awaitServer() throws Exception {
serverLatch.await(5, SECONDS); assertTrue(serverLatch.await(5, SECONDS));
serverOut.flush();
} }
private ChannelHandlerContext ctxClient() { private ChannelHandlerContext ctxClient() {

View File

@ -105,7 +105,7 @@ public class DefaultHttp2ConnectionDecoderTest {
private Http2ConnectionEncoder encoder; private Http2ConnectionEncoder encoder;
@Mock @Mock
private Http2InboundFlowState inFlowState; private Http2FlowControlWindowManager inFlowState;
@Mock @Mock
private Http2LifecycleManager lifecycleManager; private Http2LifecycleManager lifecycleManager;
@ -119,7 +119,7 @@ public class DefaultHttp2ConnectionDecoderTest {
when(channel.isActive()).thenReturn(true); when(channel.isActive()).thenReturn(true);
when(stream.id()).thenReturn(STREAM_ID); when(stream.id()).thenReturn(STREAM_ID);
when(stream.state()).thenReturn(OPEN); when(stream.state()).thenReturn(OPEN);
when(stream.inboundFlow()).thenReturn(inFlowState); when(stream.garbageCollector()).thenReturn(inFlowState);
when(pushStream.id()).thenReturn(PUSH_STREAM_ID); when(pushStream.id()).thenReturn(PUSH_STREAM_ID);
when(connection.activeStreams()).thenReturn(Collections.singletonList(stream)); when(connection.activeStreams()).thenReturn(Collections.singletonList(stream));
when(connection.stream(STREAM_ID)).thenReturn(stream); when(connection.stream(STREAM_ID)).thenReturn(stream);

View File

@ -219,7 +219,7 @@ public class DefaultHttp2InboundFlowControllerTest {
} }
private void returnProcessedBytes(int streamId, int processedBytes) throws Http2Exception { private void returnProcessedBytes(int streamId, int processedBytes) throws Http2Exception {
connection.requireStream(streamId).inboundFlow().returnProcessedBytes(ctx, processedBytes); connection.requireStream(streamId).garbageCollector().returnProcessedBytes(ctx, processedBytes);
} }
private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) throws Http2Exception { private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) throws Http2Exception {