diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/CompressorHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/CompressorHttp2ConnectionEncoder.java index e459c7b35c..3548609eac 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/CompressorHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/CompressorHttp2ConnectionEncoder.java @@ -41,7 +41,7 @@ public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEnco private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() { @Override public void streamRemoved(Http2Stream stream) { - final EmbeddedChannel compressor = stream.compressor(); + final EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class); if (compressor != null) { cleanup(stream, compressor); } @@ -103,19 +103,24 @@ public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEnco public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding, final boolean endOfStream, ChannelPromise promise) { final Http2Stream stream = connection().stream(streamId); - final EmbeddedChannel compressor = stream == null ? null : stream.compressor(); - if (compressor == null) { + final EmbeddedChannel channel = stream == null ? null : + (EmbeddedChannel) stream.getProperty(CompressorHttp2ConnectionEncoder.class); + if (channel == null) { // The compressor may be null if no compatible encoding type was found in this stream's headers return super.writeData(ctx, streamId, data, padding, endOfStream, promise); } try { // call retain here as it will call release after its written to the channel - compressor.writeOutbound(data.retain()); - ByteBuf buf = nextReadableBuf(compressor); + channel.writeOutbound(data.retain()); + ByteBuf buf = nextReadableBuf(channel); if (buf == null) { if (endOfStream) { - return super.writeData(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, endOfStream, promise); + if (channel.finish()) { + buf = nextReadableBuf(channel); + } + return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding, + true, promise); } // END_STREAM is not set and the assumption is data is still forthcoming. promise.setSuccess(); @@ -123,23 +128,39 @@ public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEnco } ChannelPromiseAggregator aggregator = new ChannelPromiseAggregator(promise); + ChannelPromise bufPromise = ctx.newPromise(); + aggregator.add(bufPromise); for (;;) { - final ByteBuf nextBuf = nextReadableBuf(compressor); - final boolean endOfStreamForBuf = nextBuf == null && endOfStream; - ChannelPromise newPromise = ctx.newPromise(); - aggregator.add(newPromise); + ByteBuf nextBuf = nextReadableBuf(channel); + boolean compressedEndOfStream = nextBuf == null && endOfStream; + if (compressedEndOfStream && channel.finish()) { + nextBuf = nextReadableBuf(channel); + compressedEndOfStream = nextBuf == null; + } - super.writeData(ctx, streamId, buf, padding, endOfStreamForBuf, newPromise); + final ChannelPromise nextPromise; + if (nextBuf != null) { + // We have to add the nextPromise to the aggregator before doing the current write. This is so + // completing the current write before the next write is done won't complete the aggregate promise + nextPromise = ctx.newPromise(); + aggregator.add(nextPromise); + } else { + nextPromise = null; + } + + super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise); if (nextBuf == null) { break; } + padding = 0; // Padding is only communicated once on the first iteration buf = nextBuf; + bufPromise = nextPromise; } return promise; } finally { if (endOfStream) { - cleanup(stream, compressor); + cleanup(stream, channel); } } } @@ -215,7 +236,7 @@ public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEnco return; } - EmbeddedChannel compressor = stream.compressor(); + EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class); if (compressor == null) { if (!endOfStream) { AsciiString encoding = headers.get(CONTENT_ENCODING); @@ -225,6 +246,7 @@ public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEnco try { compressor = newContentCompressor(encoding); if (compressor != null) { + stream.setProperty(CompressorHttp2ConnectionEncoder.class, compressor); AsciiString targetContentEncoding = getTargetContentEncoding(encoding); if (IDENTITY.equalsIgnoreCase(targetContentEncoding)) { headers.remove(CONTENT_ENCODING); @@ -261,10 +283,11 @@ public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEnco if (buf == null) { break; } + buf.release(); } } - stream.compressor(null); + stream.removeProperty(CompressorHttp2ConnectionEncoder.class); } /** diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index 8102a79991..b9a718d2c3 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -29,7 +29,6 @@ import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; import static io.netty.util.internal.ObjectUtil.checkNotNull; -import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action; import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; @@ -37,9 +36,11 @@ import io.netty.util.collection.IntObjectMap; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -217,14 +218,14 @@ public class DefaultHttp2Connection implements Http2Connection { private boolean resetReceived; private boolean endOfStreamSent; private boolean endOfStreamReceived; - private Http2InboundFlowState inboundFlow; + private Http2FlowState inboundFlow; private Http2FlowState outboundFlow; - private EmbeddedChannel decompressor; - private EmbeddedChannel compressor; - private Object data; + private Http2FlowControlWindowManager garbageCollector; + private PropertyMap data; DefaultStream(int id) { this.id = id; + data = new LazyPropertyMap(this); } @Override @@ -287,49 +288,27 @@ public class DefaultHttp2Connection implements Http2Connection { } @Override - public void data(Object data) { - this.data = data; - } - - @SuppressWarnings("unchecked") - @Override - public T data() { - return (T) data; + public Object setProperty(Object key, Object value) { + return data.put(key, value); } @Override - public void decompressor(EmbeddedChannel decompressor) { - if (this.decompressor != null && decompressor != null) { - throw new IllegalStateException("decompressor can not be reassigned"); - } - this.decompressor = decompressor; + public V getProperty(Object key) { + return data.get(key); } @Override - public EmbeddedChannel decompressor() { - return decompressor; + public V removeProperty(Object key) { + return data.remove(key); } @Override - public void compressor(EmbeddedChannel compressor) { - if (this.compressor != null && compressor != null) { - throw new IllegalStateException("compressor can not be reassigned"); - } - this.compressor = compressor; - } - - @Override - public EmbeddedChannel compressor() { - return compressor; - } - - @Override - public Http2InboundFlowState inboundFlow() { + public Http2FlowState inboundFlow() { return inboundFlow; } @Override - public void inboundFlow(Http2InboundFlowState state) { + public void inboundFlow(Http2FlowState state) { inboundFlow = state; } @@ -343,6 +322,16 @@ public class DefaultHttp2Connection implements Http2Connection { outboundFlow = state; } + @Override + public Http2FlowControlWindowManager garbageCollector() { + return garbageCollector; + } + + @Override + public void garbageCollector(Http2FlowControlWindowManager collector) { + garbageCollector = collector; + } + @Override public final boolean isRoot() { return parent == null; @@ -597,6 +586,75 @@ public class DefaultHttp2Connection implements Http2Connection { } } + /** + * Allows the data map to be lazily initialized for {@link DefaultStream}. + */ + private interface PropertyMap { + Object put(Object key, Object value); + + V get(Object key); + + V remove(Object key); + } + + /** + * Provides actual {@link HashMap} functionality for {@link DefaultStream}'s application data. + */ + private static final class DefaultProperyMap implements PropertyMap { + private final Map data; + + DefaultProperyMap(int initialSize) { + data = new HashMap(initialSize); + } + + @Override + public Object put(Object key, Object value) { + return data.put(key, value); + } + + @SuppressWarnings("unchecked") + @Override + public V get(Object key) { + return (V) data.get(key); + } + + @SuppressWarnings("unchecked") + @Override + public V remove(Object key) { + return (V) data.remove(key); + } + } + + /** + * Provides the lazy initialization for the {@link DefaultStream} data map. + */ + private static final class LazyPropertyMap implements PropertyMap { + private static final int DEFAULT_INITIAL_SIZE = 4; + private final DefaultStream stream; + + LazyPropertyMap(DefaultStream stream) { + this.stream = stream; + } + + @Override + public Object put(Object key, Object value) { + stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE); + return stream.data.put(key, value); + } + + @Override + public V get(Object key) { + stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE); + return stream.data.get(key); + } + + @Override + public V remove(Object key) { + stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE); + return stream.data.remove(key); + } + } + private static IntObjectMap newChildMap() { return new IntObjectHashMap(4); } @@ -604,7 +662,7 @@ public class DefaultHttp2Connection implements Http2Connection { /** * Allows a correlation to be made between a stream and its old parent before a parent change occurs */ - private final class ParentChangedEvent { + private static final class ParentChangedEvent { private final Http2Stream stream; private final Http2Stream oldParent; diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index 0781ef3ea0..89d8bc0fd9 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -66,6 +66,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { return this; } + @Override + public Http2LifecycleManager lifecycleManager() { + return lifecycleManager; + } + @Override public Builder inboundFlow(Http2InboundFlowController inboundFlow) { this.inboundFlow = inboundFlow; @@ -193,7 +198,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { } private static int unprocessedBytes(Http2Stream stream) { - return stream.inboundFlow().unProcessedBytes(); + return stream.garbageCollector().unProcessedBytes(); } /** @@ -284,7 +289,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { } finally { // If appropriate, returned the processed bytes to the flow controller. if (shouldApplyFlowControl && bytesToReturn > 0) { - stream.inboundFlow().returnProcessedBytes(ctx, bytesToReturn); + stream.garbageCollector().returnProcessedBytes(ctx, bytesToReturn); } if (endOfStream) { diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index 233408a0ab..d8ce98fd91 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -60,6 +60,11 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { return this; } + @Override + public Http2LifecycleManager lifecycleManager() { + return lifecycleManager; + } + @Override public Builder frameWriter( Http2FrameWriter frameWriter) { diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java index 792f376b9d..787be36257 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java @@ -60,13 +60,18 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro this.windowUpdateRatio = windowUpdateRatio; // Add a flow state for the connection. - connection.connectionStream().inboundFlow(new FlowState(CONNECTION_STREAM_ID)); + final Http2Stream connectionStream = connection.connectionStream(); + final FlowState connectionFlowState = new FlowState(connectionStream); + connectionStream.inboundFlow(connectionFlowState); + connectionStream.garbageCollector(connectionFlowState); // Register for notification of new streams. connection.addListener(new Http2ConnectionAdapter() { @Override public void streamAdded(Http2Stream stream) { - stream.inboundFlow(new FlowState(stream.id())); + final FlowState flowState = new FlowState(stream); + stream.inboundFlow(flowState); + stream.garbageCollector(flowState); } }); } @@ -141,8 +146,8 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro /** * Flow control window state for an individual stream. */ - private final class FlowState implements Http2InboundFlowState { - private final int streamId; + private final class FlowState implements Http2FlowState, Http2FlowControlWindowManager { + private final Http2Stream stream; /** * The actual flow control window that is decremented as soon as {@code DATA} arrives. @@ -160,8 +165,8 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro private int lowerBound; private boolean endOfStream; - FlowState(int streamId) { - this.streamId = streamId; + FlowState(Http2Stream stream) { + this.stream = stream; window = initialWindowSize; processedWindow = window; } @@ -180,7 +185,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro */ int initialWindowSize() { int maxWindowSize = initialWindowSize; - if (streamId == CONNECTION_STREAM_ID) { + if (stream.id() == CONNECTION_STREAM_ID) { // Determine the maximum number of streams that we can allow without integer overflow // of maxWindowSize * numStreams. Also take care to avoid division by zero when // maxWindowSize == 0. @@ -196,7 +201,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro @Override public void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { - if (streamId == CONNECTION_STREAM_ID) { + if (stream.id() == CONNECTION_STREAM_ID) { throw new UnsupportedOperationException("Returning bytes for the connection window is not supported"); } checkNotNull(ctx, "ctx"); @@ -214,6 +219,11 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro return processedWindow - window; } + @Override + public Http2Stream stream() { + return stream; + } + /** * Updates the flow control window for this stream if it is appropriate. */ @@ -233,13 +243,12 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro */ void returnProcessedBytes(int delta) throws Http2Exception { if (processedWindow - delta < window) { - if (streamId == CONNECTION_STREAM_ID) { + if (stream.id() == CONNECTION_STREAM_ID) { throw new Http2Exception(INTERNAL_ERROR, "Attempting to return too many bytes for connection"); - } else { - throw new Http2StreamException(streamId, INTERNAL_ERROR, - "Attempting to return too many bytes for stream " + streamId); } + throw new Http2StreamException(stream.id(), INTERNAL_ERROR, + "Attempting to return too many bytes for stream " + stream.id()); } processedWindow -= delta; } @@ -264,10 +273,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro // This difference is stored for the connection when writing the SETTINGS frame // and is cleared once we send a WINDOW_UPDATE frame. if (delta < 0 && window < lowerBound) { - if (streamId == CONNECTION_STREAM_ID) { + if (stream.id() == CONNECTION_STREAM_ID) { throw protocolError("Connection flow control window exceeded"); } else { - throw flowControlError("Flow control window exceeded for stream: %d", streamId); + throw flowControlError("Flow control window exceeded for stream: %d", stream.id()); } } @@ -286,7 +295,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro void updatedInitialWindowSize(int delta) throws Http2Exception { if (delta > 0 && window > Integer.MAX_VALUE - delta) { // Integer overflow. - throw flowControlError("Flow control window overflowed for stream: %d", streamId); + throw flowControlError("Flow control window overflowed for stream: %d", stream.id()); } window += delta; processedWindow += delta; @@ -314,7 +323,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro } // Send a window update for the stream/connection. - frameWriter.writeWindowUpdate(ctx, streamId, deltaWindowSize, ctx.newPromise()); + frameWriter.writeWindowUpdate(ctx, stream.id(), deltaWindowSize, ctx.newPromise()); ctx.flush(); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java index 817bbdbf7b..8f401e5983 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java @@ -29,6 +29,7 @@ import io.netty.handler.codec.AsciiString; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.compression.ZlibCodecFactory; import io.netty.handler.codec.compression.ZlibWrapper; +import io.netty.util.CharsetUtil; /** * A HTTP2 frame listener that will decompress data frames according to the {@code content-encoding} header for each @@ -38,7 +39,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() { @Override public void streamRemoved(Http2Stream stream) { - final EmbeddedChannel decompressor = stream.decompressor(); + final Http2Decompressor decompressor = stream.getProperty(Http2Decompressor.class); if (decompressor != null) { cleanup(stream, decompressor); } @@ -63,47 +64,59 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor @Override public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) - throws Http2Exception { + throws Http2Exception { final Http2Stream stream = connection.stream(streamId); - final EmbeddedChannel decompressor = stream == null ? null : stream.decompressor(); + final Http2Decompressor decompressor = stream == null ? null : + (Http2Decompressor) stream.getProperty(Http2Decompressor.class); if (decompressor == null) { // The decompressor may be null if no compatible encoding type was found in this stream's headers return listener.onDataRead(ctx, streamId, data, padding, endOfStream); } - // When decompressing, always opt-out of application-level flow control. - // TODO: investigate how to apply application-level flow control when decompressing. - int processedBytes = data.readableBytes() + padding; - try { - // call retain here as it will call release after its written to the channel - decompressor.writeInbound(data.retain()); - ByteBuf buf = nextReadableBuf(decompressor); - if (buf == null) { - if (endOfStream) { - listener.onDataRead(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, true); - } - // END_STREAM is not set and the data could not be decoded yet. - // The assumption has to be there will be more data frames to complete the decode. - // We don't have enough information here to know if this is an error. - } else { - for (;;) { - final ByteBuf nextBuf = nextReadableBuf(decompressor); - final boolean endOfStreamForBuf = nextBuf == null && endOfStream; - - listener.onDataRead(ctx, streamId, buf, padding, endOfStreamForBuf); - if (nextBuf == null) { - break; - } - - buf = nextBuf; - } - } - return processedBytes; - } finally { + final EmbeddedChannel channel = decompressor.decompressor(); + final int compressedBytes = data.readableBytes() + padding; + int processedBytes = 0; + decompressor.incrementCompressedBytes(compressedBytes); + // call retain here as it will call release after its written to the channel + channel.writeInbound(data.retain()); + ByteBuf buf = nextReadableBuf(channel); + if (buf == null && endOfStream && channel.finish()) { + buf = nextReadableBuf(channel); + } + if (buf == null) { if (endOfStream) { - cleanup(stream, decompressor); + listener.onDataRead(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, true); + } + // No new decompressed data was extracted from the compressed data. This means the application could not be + // provided with data and thus could not return how many bytes were processed. We will assume there is more + // data coming which will complete the decompression block. To allow for more data we return all bytes to + // the flow control window (so the peer can send more data). + decompressor.incrementDecompressedByes(compressedBytes); + processedBytes = compressedBytes; + } else { + decompressor.incrementDecompressedByes(padding); + for (;;) { + ByteBuf nextBuf = nextReadableBuf(channel); + boolean decompressedEndOfStream = nextBuf == null && endOfStream; + if (decompressedEndOfStream && channel.finish()) { + nextBuf = nextReadableBuf(channel); + decompressedEndOfStream = nextBuf == null; + } + + decompressor.incrementDecompressedByes(buf.readableBytes()); + processedBytes += listener.onDataRead(ctx, streamId, buf, padding, decompressedEndOfStream); + if (nextBuf == null) { + break; + } + + padding = 0; // Padding is only communicated once on the first iteration + buf = nextBuf; } } + + decompressor.incrementProcessedBytes(processedBytes); + // The processed bytes will be translated to pre-decompressed byte amounts by DecompressorGarbageCollector + return processedBytes; } @Override @@ -172,29 +185,27 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor return; } - EmbeddedChannel decompressor = stream.decompressor(); - if (decompressor == null) { - if (!endOfStream) { - // Determine the content encoding. - AsciiString contentEncoding = headers.get(CONTENT_ENCODING); - if (contentEncoding == null) { - contentEncoding = IDENTITY; - } - decompressor = newContentDecompressor(contentEncoding); - if (decompressor != null) { - stream.decompressor(decompressor); - // Decode the content and remove or replace the existing headers - // so that the message looks like a decoded message. - AsciiString targetContentEncoding = getTargetContentEncoding(contentEncoding); - if (IDENTITY.equalsIgnoreCase(targetContentEncoding)) { - headers.remove(CONTENT_ENCODING); - } else { - headers.set(CONTENT_ENCODING, targetContentEncoding); - } + Http2Decompressor decompressor = stream.getProperty(Http2Decompressor.class); + if (decompressor == null && !endOfStream) { + // Determine the content encoding. + AsciiString contentEncoding = headers.get(CONTENT_ENCODING); + if (contentEncoding == null) { + contentEncoding = IDENTITY; + } + final EmbeddedChannel channel = newContentDecompressor(contentEncoding); + if (channel != null) { + decompressor = new Http2Decompressor(channel); + stream.setProperty(Http2Decompressor.class, decompressor); + stream.garbageCollector(new DecompressorGarbageCollector(stream.garbageCollector())); + // Decode the content and remove or replace the existing headers + // so that the message looks like a decoded message. + AsciiString targetContentEncoding = getTargetContentEncoding(contentEncoding); + if (IDENTITY.equalsIgnoreCase(targetContentEncoding)) { + headers.remove(CONTENT_ENCODING); + } else { + headers.set(CONTENT_ENCODING, targetContentEncoding); } } - } else if (endOfStream) { - cleanup(stream, decompressor); } if (decompressor != null) { @@ -212,17 +223,22 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor * @param stream The stream for which {@code decompressor} is the decompressor for * @param decompressor The decompressor for {@code stream} */ - private static void cleanup(Http2Stream stream, EmbeddedChannel decompressor) { - if (decompressor.finish()) { + private static void cleanup(Http2Stream stream, Http2Decompressor decompressor) { + final EmbeddedChannel channel = decompressor.decompressor(); + if (channel.finish()) { for (;;) { - final ByteBuf buf = decompressor.readInbound(); + final ByteBuf buf = channel.readInbound(); if (buf == null) { break; } buf.release(); } } - stream.decompressor(null); + decompressor = stream.removeProperty(Http2Decompressor.class); + if (decompressor != null) { + DecompressorGarbageCollector gc = (DecompressorGarbageCollector) stream.garbageCollector(); + stream.garbageCollector(gc.original()); + } } /** @@ -245,4 +261,128 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor return buf; } } + + /** + * Garbage collector which translates post-decompression amounts the application knows about + * to pre-decompression amounts that flow control knows about. + */ + private static final class DecompressorGarbageCollector implements Http2FlowControlWindowManager { + private final Http2FlowControlWindowManager original; + + DecompressorGarbageCollector(Http2FlowControlWindowManager original) { + this.original = original; + } + + @Override + public void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { + final Http2Stream stream = stream(); + final Http2Decompressor decompressor = stream.getProperty(Http2Decompressor.class); + + // Make a copy before hand in case any exceptions occur we will roll back the state + Http2Decompressor copy = new Http2Decompressor(decompressor); + try { + original.returnProcessedBytes(ctx, decompressor.consumeProcessedBytes(numBytes)); + } catch (Http2Exception e) { + stream.setProperty(Http2Decompressor.class, copy); + throw e; + } catch (Throwable t) { + stream.setProperty(Http2Decompressor.class, copy); + throw new Http2Exception(Http2Error.INTERNAL_ERROR, + "Error while returning bytes to flow control window", t); + } + } + + Http2FlowControlWindowManager original() { + return original; + } + + @Override + public int unProcessedBytes() { + return original.unProcessedBytes(); + } + + @Override + public Http2Stream stream() { + return original.stream(); + } + } + + /** + * Provides the state for stream {@code DATA} frame decompression. + */ + private static final class Http2Decompressor { + private final EmbeddedChannel decompressor; + private int processed; + private int compressed; + private int decompressed; + + Http2Decompressor(Http2Decompressor rhs) { + this(rhs.decompressor); + processed = rhs.processed; + compressed = rhs.compressed; + decompressed = rhs.decompressed; + } + + Http2Decompressor(EmbeddedChannel decompressor) { + this.decompressor = decompressor; + } + + /** + * Responsible for taking compressed bytes in and producing decompressed bytes. + */ + EmbeddedChannel decompressor() { + return decompressor; + } + + /** + * Increment the number of decompressed bytes processed by the application. + */ + void incrementProcessedBytes(int delta) { + if (processed + delta < 0) { + throw new IllegalArgumentException("processed bytes cannot be negative"); + } + processed += delta; + } + + /** + * Increment the number of bytes received prior to doing any decompression. + */ + void incrementCompressedBytes(int delta) { + if (compressed + delta < 0) { + throw new IllegalArgumentException("compressed bytes cannot be negative"); + } + compressed += delta; + } + + /** + * Increment the number of bytes after the decompression process. Under normal circumstances this + * delta should not exceed {@link Http2Decompressor#processedBytes()}. + */ + void incrementDecompressedByes(int delta) { + if (decompressed + delta < 0) { + throw new IllegalArgumentException("decompressed bytes cannot be negative"); + } + decompressed += delta; + } + + /** + * Decrements {@link Http2Decompressor#processedBytes()} by {@code processedBytes} and determines the ratio + * between {@code processedBytes} and {@link Http2Decompressor#decompressedBytes()}. + * This ratio is used to decrement {@link Http2Decompressor#decompressedBytes()} and + * {@link Http2Decompressor#compressedBytes()}. + * @param processedBytes The number of post-decompressed bytes that have been processed. + * @return The number of pre-decompressed bytes that have been consumed. + */ + int consumeProcessedBytes(int processedBytes) { + // Consume the processed bytes first to verify that is is a valid amount + incrementProcessedBytes(-processedBytes); + + double consumedRatio = processedBytes / (double) decompressed; + int consumedCompressed = Math.min(compressed, (int) Math.ceil(compressed * consumedRatio)); + incrementDecompressedByes(-Math.min(decompressed, (int) Math.ceil(decompressed * consumedRatio))); + incrementCompressedBytes(-consumedCompressed); + + return consumedCompressed; + } + } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java index 3d95122c97..606591b97e 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java @@ -43,6 +43,11 @@ public interface Http2ConnectionDecoder extends Closeable { */ Builder lifecycleManager(Http2LifecycleManager lifecycleManager); + /** + * Gets the {@link Http2LifecycleManager} to be used when building the decoder. + */ + Http2LifecycleManager lifecycleManager(); + /** * Sets the {@link Http2InboundFlowController} to be used when building the decoder. */ diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java index ba73097fbb..56f8f2015a 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java @@ -40,6 +40,11 @@ public interface Http2ConnectionEncoder extends Http2FrameWriter, Http2OutboundF */ Builder lifecycleManager(Http2LifecycleManager lifecycleManager); + /** + * Gets the {@link Http2LifecycleManager} to be used when building the encoder. + */ + Http2LifecycleManager lifecycleManager(); + /** * Sets the {@link Http2FrameWriter} to be used when building the encoder. */ diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 04824dad94..c8d1ed65d3 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -79,13 +79,18 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http checkNotNull(decoderBuilder, "decoderBuilder"); checkNotNull(encoderBuilder, "encoderBuilder"); + if (encoderBuilder.lifecycleManager() != decoderBuilder.lifecycleManager()) { + throw new IllegalArgumentException("Encoder and Decoder must share a lifecycle manager"); + } else if (encoderBuilder.lifecycleManager() == null) { + encoderBuilder.lifecycleManager(this); + decoderBuilder.lifecycleManager(this); + } + // Build the encoder. - encoderBuilder.lifecycleManager(this); encoder = checkNotNull(encoderBuilder.build(), "encoder"); // Build the decoder. decoderBuilder.encoder(encoder); - decoderBuilder.lifecycleManager(this); decoder = checkNotNull(decoderBuilder.build(), "decoder"); // Verify that the encoder and decoder use the same connection. @@ -146,8 +151,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - // The channel just became active - send the connection preface to the remote - // endpoint. + // The channel just became active - send the connection preface to the remote endpoint. sendPreface(ctx); super.channelActive(ctx); } @@ -166,7 +170,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - // Avoid NotYetConnectedException + // Avoid NotYetConnectedException if (!ctx.channel().isActive()) { ctx.close(promise); return; diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowControlWindowManager.java similarity index 88% rename from codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java rename to codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowControlWindowManager.java index 5b9306724d..17e82fe74f 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowControlWindowManager.java @@ -17,11 +17,9 @@ package io.netty.handler.codec.http2; import io.netty.channel.ChannelHandlerContext; /** - * The inbound flow control state for a stream. This object is created and managed by the - * {@link Http2InboundFlowController}. + * Allows data to be returned to the flow control window. */ -public interface Http2InboundFlowState extends Http2FlowState { - +public interface Http2FlowControlWindowManager { /** * Used by applications that participate in application-level inbound flow control. Allows the * application to return a number of bytes that has been processed and thereby enabling the @@ -38,4 +36,9 @@ public interface Http2InboundFlowState extends Http2FlowState { * The number of bytes that are outstanding and have not yet been returned to the flow controller. */ int unProcessedBytes(); + + /** + * Get the stream that is being managed + */ + Http2Stream stream(); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java index 029c919d37..8f894227b1 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java @@ -15,8 +15,6 @@ package io.netty.handler.codec.http2; -import io.netty.channel.embedded.EmbeddedChannel; - import java.util.Collection; /** @@ -135,43 +133,29 @@ public interface Http2Stream { /** * Associates the application-defined data with this stream. + * @return The value that was previously associated with {@code key}, or {@code null} if there was none. */ - void data(Object data); + Object setProperty(Object key, Object value); /** * Returns application-defined data if any was associated with this stream. */ - T data(); + V getProperty(Object key); /** - * Associate an object responsible for decompressing data frames for this stream + * Returns and removes application-defined data if any was associated with this stream. */ - void decompressor(EmbeddedChannel decompressor); - - /** - * Get the object capable of decompressing data frames for this stream - */ - EmbeddedChannel decompressor(); - - /** - * Associate an object responsible for compressing data frames for this stream - */ - void compressor(EmbeddedChannel decompressor); - - /** - * Get the object capable of compressing data frames for this stream - */ - EmbeddedChannel compressor(); + V removeProperty(Object key); /** * Gets the in-bound flow control state for this stream. */ - Http2InboundFlowState inboundFlow(); + Http2FlowState inboundFlow(); /** * Sets the in-bound flow control state for this stream. */ - void inboundFlow(Http2InboundFlowState state); + void inboundFlow(Http2FlowState state); /** * Gets the out-bound flow control window for this stream. @@ -183,6 +167,16 @@ public interface Http2Stream { */ void outboundFlow(Http2FlowState state); + /** + * Gets the interface which allows bytes to be returned to the flow controller + */ + Http2FlowControlWindowManager garbageCollector(); + + /** + * Sets the interface which allows bytes to be returned to the flow controller + */ + void garbageCollector(Http2FlowControlWindowManager collector); + /** * Updates an priority for this stream. Calling this method may affect the straucture of the * priority tree. diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java index 6a7958f820..b7f3125101 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java @@ -19,47 +19,50 @@ import static io.netty.handler.codec.http2.Http2TestUtil.as; import static io.netty.handler.codec.http2.Http2TestUtil.runInChannel; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; -import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.AsciiString; -import io.netty.handler.codec.compression.ZlibCodecFactory; -import io.netty.handler.codec.compression.ZlibWrapper; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http2.Http2TestUtil.FrameAdapter; +import io.netty.handler.codec.http2.Http2TestUtil.FrameCountDown; import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable; +import io.netty.util.CharsetUtil; import io.netty.util.NetUtil; import io.netty.util.concurrent.Future; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.net.InetSocketAddress; -import java.util.List; +import java.util.Random; import java.util.concurrent.CountDownLatch; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * Test for data decompression in the HTTP/2 codec. @@ -68,18 +71,12 @@ public class DataCompressionHttp2Test { private static final AsciiString GET = as("GET"); private static final AsciiString POST = as("POST"); private static final AsciiString PATH = as("/some/path"); - private List dataCapture; @Mock private Http2FrameListener serverListener; @Mock private Http2FrameListener clientListener; - @Mock - private Http2LifecycleManager serverLifeCycleManager; - @Mock - private Http2LifecycleManager clientLifeCycleManager; - private ByteBufAllocator alloc; private Http2ConnectionEncoder serverEncoder; private Http2ConnectionEncoder clientEncoder; private ServerBootstrap sb; @@ -89,23 +86,22 @@ public class DataCompressionHttp2Test { private Channel clientChannel; private volatile CountDownLatch serverLatch; private volatile CountDownLatch clientLatch; - private FrameAdapter serverAdapter; - private FrameAdapter clientAdapter; private Http2Connection serverConnection; + private Http2Connection clientConnection; + private ByteArrayOutputStream serverOut; @Before public void setup() throws InterruptedException { MockitoAnnotations.initMocks(this); } + @After + public void cleaup() throws IOException { + serverOut.close(); + } + @After public void teardown() throws InterruptedException { - if (dataCapture != null) { - for (int i = 0; i < dataCapture.size(); ++i) { - dataCapture.get(i).release(); - } - dataCapture = null; - } serverChannel.close().sync(); Future serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS); Future serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS); @@ -113,19 +109,18 @@ public class DataCompressionHttp2Test { serverGroup.sync(); serverChildGroup.sync(); clientGroup.sync(); - serverAdapter = null; - clientAdapter = null; - serverConnection = null; } @Test public void justHeadersNoData() throws Exception { - bootstrapEnv(1, 1); + bootstrapEnv(1, 0, 1); final Http2Headers headers = new DefaultHttp2Headers().method(GET).path(PATH) .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); + // Required because the decompressor intercepts the onXXXRead events before // our {@link Http2TestUtil$FrameAdapter} does. FrameAdapter.getOrCreateStream(serverConnection, 3, false); + FrameAdapter.getOrCreateStream(clientConnection, 3, false); runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { @@ -140,131 +135,109 @@ public class DataCompressionHttp2Test { @Test public void gzipEncodingSingleEmptyMessage() throws Exception { - bootstrapEnv(2, 1); final String text = ""; final ByteBuf data = Unpooled.copiedBuffer(text.getBytes()); - final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); + bootstrapEnv(1, data.readableBytes(), 1); try { - final ByteBuf encodedData = encodeData(data, encoder); final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); + // Required because the decompressor intercepts the onXXXRead events before // our {@link Http2TestUtil$FrameAdapter} does. - FrameAdapter.getOrCreateStream(serverConnection, 3, false); + Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false); + FrameAdapter.getOrCreateStream(clientConnection, 3, false); runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient()); - clientEncoder.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient()); + clientEncoder.writeData(ctxClient(), 3, data, 0, true, newPromiseClient()); ctxClient().flush(); } }); awaitServer(); - data.resetReaderIndex(); - ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(ByteBuf.class); - verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0), - eq(true)); - dataCapture = dataCaptor.getAllValues(); - assertEquals(data, dataCapture.get(0)); + assertEquals(0, stream.garbageCollector().unProcessedBytes()); + assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name())); } finally { data.release(); - cleanupEncoder(encoder); } } @Test public void gzipEncodingSingleMessage() throws Exception { - bootstrapEnv(2, 1); final String text = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc"; final ByteBuf data = Unpooled.copiedBuffer(text.getBytes()); - final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); + bootstrapEnv(1, data.readableBytes(), 1); try { - final ByteBuf encodedData = encodeData(data, encoder); final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); + // Required because the decompressor intercepts the onXXXRead events before // our {@link Http2TestUtil$FrameAdapter} does. - FrameAdapter.getOrCreateStream(serverConnection, 3, false); + Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false); + FrameAdapter.getOrCreateStream(clientConnection, 3, false); runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient()); - clientEncoder.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient()); + clientEncoder.writeData(ctxClient(), 3, data, 0, true, newPromiseClient()); ctxClient().flush(); } }); awaitServer(); - data.resetReaderIndex(); - ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(ByteBuf.class); - verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0), - eq(true)); - dataCapture = dataCaptor.getAllValues(); - assertEquals(data, dataCapture.get(0)); + assertEquals(0, stream.garbageCollector().unProcessedBytes()); + assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name())); } finally { data.release(); - cleanupEncoder(encoder); } } @Test public void gzipEncodingMultipleMessages() throws Exception { - bootstrapEnv(3, 1); final String text1 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc"; final String text2 = "dddddddddddddddddddeeeeeeeeeeeeeeeeeeeffffffffffffffffffff"; final ByteBuf data1 = Unpooled.copiedBuffer(text1.getBytes()); final ByteBuf data2 = Unpooled.copiedBuffer(text2.getBytes()); - final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); + bootstrapEnv(1, data1.readableBytes() + data2.readableBytes(), 1); try { - final ByteBuf encodedData1 = encodeData(data1, encoder); - final ByteBuf encodedData2 = encodeData(data2, encoder); final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); + // Required because the decompressor intercepts the onXXXRead events before // our {@link Http2TestUtil$FrameAdapter} does. - FrameAdapter.getOrCreateStream(serverConnection, 3, false); + Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false); + FrameAdapter.getOrCreateStream(clientConnection, 3, false); runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient()); - clientEncoder.writeData(ctxClient(), 3, encodedData1, 0, false, newPromiseClient()); - clientEncoder.writeData(ctxClient(), 3, encodedData2, 0, true, newPromiseClient()); + clientEncoder.writeData(ctxClient(), 3, data1, 0, false, newPromiseClient()); + clientEncoder.writeData(ctxClient(), 3, data2, 0, true, newPromiseClient()); ctxClient().flush(); } }); awaitServer(); - data1.resetReaderIndex(); - data2.resetReaderIndex(); - ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(ByteBuf.class); - ArgumentCaptor endStreamCaptor = ArgumentCaptor.forClass(Boolean.class); - verify(serverListener, times(2)).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), - eq(0), endStreamCaptor.capture()); - dataCapture = dataCaptor.getAllValues(); - assertEquals(data1, dataCapture.get(0)); - assertEquals(data2, dataCapture.get(1)); - List endStreamCapture = endStreamCaptor.getAllValues(); - assertFalse(endStreamCapture.get(0)); - assertTrue(endStreamCapture.get(1)); + assertEquals(0, stream.garbageCollector().unProcessedBytes()); + assertEquals(new StringBuilder(text1).append(text2).toString(), + serverOut.toString(CharsetUtil.UTF_8.name())); } finally { data1.release(); data2.release(); - cleanupEncoder(encoder); } } @Test public void deflateEncodingSingleLargeMessageReducedWindow() throws Exception { - bootstrapEnv(3, 1); final int BUFFER_SIZE = 1 << 16; + bootstrapEnv(1, BUFFER_SIZE, 1); final ByteBuf data = Unpooled.buffer(BUFFER_SIZE); - final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.ZLIB)); try { for (int i = 0; i < data.capacity(); ++i) { data.writeByte((byte) 'a'); } - final ByteBuf encodedData = encodeData(data, encoder); final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE); final Http2Settings settings = new Http2Settings(); + // Assume the compression operation will reduce the size by at least 10 bytes settings.initialWindowSize(BUFFER_SIZE - 10); runInChannel(serverConnectedChannel, new Http2Runnable() { @@ -278,86 +251,123 @@ public class DataCompressionHttp2Test { // Required because the decompressor intercepts the onXXXRead events before // our {@link Http2TestUtil$FrameAdapter} does. - FrameAdapter.getOrCreateStream(serverConnection, 3, false); + Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false); + FrameAdapter.getOrCreateStream(clientConnection, 3, false); runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { clientEncoder.writeSettings(ctxClient(), settings, newPromiseClient()); clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient()); - clientEncoder.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient()); + clientEncoder.writeData(ctxClient(), 3, data, 0, true, newPromiseClient()); ctxClient().flush(); } }); awaitServer(); - data.resetReaderIndex(); - ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(ByteBuf.class); - verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0), - eq(true)); - dataCapture = dataCaptor.getAllValues(); - assertEquals(data, dataCapture.get(0)); + assertEquals(0, stream.garbageCollector().unProcessedBytes()); + assertEquals(data.resetReaderIndex().toString(CharsetUtil.UTF_8), + serverOut.toString(CharsetUtil.UTF_8.name())); } finally { data.release(); - cleanupEncoder(encoder); } } - private ByteBuf encodeData(ByteBuf data, EmbeddedChannel encoder) { - ByteBuf encoded = alloc.buffer(data.readableBytes()); - encoder.writeOutbound(data.retain()); - for (;;) { - final ByteBuf buf = encoder.readOutbound(); - if (buf == null) { - break; - } - if (!buf.isReadable()) { - buf.release(); - continue; - } - encoded.writeBytes(buf); - buf.release(); - } - return encoded; - } + @Test + public void deflateEncodingMultipleWriteLargeMessageReducedWindow() throws Exception { + final int BUFFER_SIZE = 1 << 12; + final byte[] bytes = new byte[BUFFER_SIZE]; + new Random().nextBytes(bytes); + bootstrapEnv(1, BUFFER_SIZE, 1); + final ByteBuf data = Unpooled.wrappedBuffer(bytes); + try { + final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) + .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE); + final Http2Settings settings = new Http2Settings(); - private static void cleanupEncoder(EmbeddedChannel encoder) { - if (encoder.finish()) { - for (;;) { - final ByteBuf buf = encoder.readOutbound(); - if (buf == null) { - break; + settings.initialWindowSize(BUFFER_SIZE / 2); + runInChannel(serverConnectedChannel, new Http2Runnable() { + @Override + public void run() { + serverEncoder.writeSettings(ctxServer(), settings, newPromiseServer()); + ctxServer().flush(); } - buf.release(); - } + }); + awaitClient(); + + // Required because the decompressor intercepts the onXXXRead events before + // our {@link Http2TestUtil$FrameAdapter} does. + Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false); + FrameAdapter.getOrCreateStream(clientConnection, 3, false); + runInChannel(clientChannel, new Http2Runnable() { + @Override + public void run() { + clientEncoder.writeSettings(ctxClient(), settings, newPromiseClient()); + clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient()); + clientEncoder.writeData(ctxClient(), 3, data, 0, true, newPromiseClient()); + ctxClient().flush(); + } + }); + awaitServer(); + assertEquals(0, stream.garbageCollector().unProcessedBytes()); + assertEquals(data.resetReaderIndex().toString(CharsetUtil.UTF_8), + serverOut.toString(CharsetUtil.UTF_8.name())); + } finally { + data.release(); } } - private void bootstrapEnv(int serverCountDown, int clientCountDown) throws Exception { - alloc = UnpooledByteBufAllocator.DEFAULT; + private void bootstrapEnv(int serverHalfClosedCount, int serverOutSize, int clientCount) throws Exception { + serverOut = new ByteArrayOutputStream(serverOutSize); + serverLatch = new CountDownLatch(serverHalfClosedCount); + clientLatch = new CountDownLatch(clientCount); sb = new ServerBootstrap(); cb = new Bootstrap(); - serverLatch = new CountDownLatch(serverCountDown); - clientLatch = new CountDownLatch(clientCountDown); + // Streams are created before the normal flow for this test, so these connection must be initialized up front. serverConnection = new DefaultHttp2Connection(true); + clientConnection = new DefaultHttp2Connection(false); - final CountDownLatch latch = new CountDownLatch(1); + serverConnection.addListener(new Http2ConnectionAdapter() { + @Override + public void streamHalfClosed(Http2Stream stream) { + serverLatch.countDown(); + } + }); + + doAnswer(new Answer() { + @Override + public Integer answer(InvocationOnMock in) throws Throwable { + ByteBuf buf = (ByteBuf) in.getArguments()[2]; + int padding = (Integer) in.getArguments()[3]; + int processedBytes = buf.readableBytes() + padding; + + buf.readBytes(serverOut, buf.readableBytes()); + buf.release(); + return processedBytes; + } + }).when(serverListener).onDataRead(any(ChannelHandlerContext.class), anyInt(), + any(ByteBuf.class), anyInt(), anyBoolean()); + + final CountDownLatch serverChannelLatch = new CountDownLatch(1); sb.group(new NioEventLoopGroup(), new NioEventLoopGroup()); sb.channel(NioServerSocketChannel.class); sb.childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline p = ch.pipeline(); - CompressorHttp2ConnectionEncoder.Builder builder = new CompressorHttp2ConnectionEncoder.Builder(); Http2FrameWriter writer = new DefaultHttp2FrameWriter(); - serverEncoder = builder.connection(serverConnection).frameWriter(writer) - .outboundFlow(new DefaultHttp2OutboundFlowController(serverConnection, writer)) - .lifecycleManager(serverLifeCycleManager).build(); - serverAdapter = new FrameAdapter(serverConnection, new DelegatingDecompressorFrameListener( - serverConnection, serverListener), serverLatch); - p.addLast("reader", serverAdapter); - p.addLast(Http2CodecUtil.ignoreSettingsHandler()); + Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler( + new DefaultHttp2ConnectionDecoder.Builder() + .connection(serverConnection) + .frameReader(new DefaultHttp2FrameReader()) + .inboundFlow(new DefaultHttp2InboundFlowController(serverConnection, writer)) + .listener(new DelegatingDecompressorFrameListener(serverConnection, serverListener)), + new CompressorHttp2ConnectionEncoder.Builder().connection(serverConnection).frameWriter(writer) + .outboundFlow(new DefaultHttp2OutboundFlowController(serverConnection, writer))); + serverEncoder = connectionHandler.encoder(); serverConnectedChannel = ch; - latch.countDown(); + p.addLast(connectionHandler); + p.addLast(Http2CodecUtil.ignoreSettingsHandler()); + serverChannelLatch.countDown(); } }); @@ -367,14 +377,18 @@ public class DataCompressionHttp2Test { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline p = ch.pipeline(); - Http2Connection connection = new DefaultHttp2Connection(false); + FrameCountDown clientFrameCountDown = new FrameCountDown(clientListener, clientLatch); Http2FrameWriter writer = new DefaultHttp2FrameWriter(); - CompressorHttp2ConnectionEncoder.Builder builder = new CompressorHttp2ConnectionEncoder.Builder(); - clientEncoder = builder.connection(connection).frameWriter(writer) - .outboundFlow(new DefaultHttp2OutboundFlowController(connection, writer)) - .lifecycleManager(clientLifeCycleManager).build(); - clientAdapter = new FrameAdapter(connection, clientListener, clientLatch); - p.addLast("reader", clientAdapter); + Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler( + new DefaultHttp2ConnectionDecoder.Builder() + .connection(clientConnection) + .frameReader(new DefaultHttp2FrameReader()) + .inboundFlow(new DefaultHttp2InboundFlowController(clientConnection, writer)) + .listener(new DelegatingDecompressorFrameListener(clientConnection, clientFrameCountDown)), + new CompressorHttp2ConnectionEncoder.Builder().connection(clientConnection).frameWriter(writer) + .outboundFlow(new DefaultHttp2OutboundFlowController(clientConnection, writer))); + clientEncoder = connectionHandler.encoder(); + p.addLast(connectionHandler); p.addLast(Http2CodecUtil.ignoreSettingsHandler()); } }); @@ -385,16 +399,16 @@ public class DataCompressionHttp2Test { ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port)); assertTrue(ccf.awaitUninterruptibly().isSuccess()); clientChannel = ccf.channel(); - - assertTrue(latch.await(5, SECONDS)); + assertTrue(serverChannelLatch.await(5, SECONDS)); } private void awaitClient() throws Exception { - clientLatch.await(5, SECONDS); + assertTrue(clientLatch.await(5, SECONDS)); } private void awaitServer() throws Exception { - serverLatch.await(5, SECONDS); + assertTrue(serverLatch.await(5, SECONDS)); + serverOut.flush(); } private ChannelHandlerContext ctxClient() { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java index 365f4b68d4..f23ac20663 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java @@ -105,7 +105,7 @@ public class DefaultHttp2ConnectionDecoderTest { private Http2ConnectionEncoder encoder; @Mock - private Http2InboundFlowState inFlowState; + private Http2FlowControlWindowManager inFlowState; @Mock private Http2LifecycleManager lifecycleManager; @@ -119,7 +119,7 @@ public class DefaultHttp2ConnectionDecoderTest { when(channel.isActive()).thenReturn(true); when(stream.id()).thenReturn(STREAM_ID); when(stream.state()).thenReturn(OPEN); - when(stream.inboundFlow()).thenReturn(inFlowState); + when(stream.garbageCollector()).thenReturn(inFlowState); when(pushStream.id()).thenReturn(PUSH_STREAM_ID); when(connection.activeStreams()).thenReturn(Collections.singletonList(stream)); when(connection.stream(STREAM_ID)).thenReturn(stream); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java index 90184c1b49..85380423cc 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java @@ -219,7 +219,7 @@ public class DefaultHttp2InboundFlowControllerTest { } private void returnProcessedBytes(int streamId, int processedBytes) throws Http2Exception { - connection.requireStream(streamId).inboundFlow().returnProcessedBytes(ctx, processedBytes); + connection.requireStream(streamId).garbageCollector().returnProcessedBytes(ctx, processedBytes); } private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) throws Http2Exception {