diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/CompressorHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/CompressorHttp2ConnectionEncoder.java index 609a39474d..0a1e2e6b3c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/CompressorHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/CompressorHttp2ConnectionEncoder.java @@ -38,16 +38,6 @@ import io.netty.util.ByteString; * stream. The compression provided by this class will be applied to the data for the entire stream. */ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { - private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() { - @Override - public void onStreamRemoved(Http2Stream stream) { - final EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class); - if (compressor != null) { - cleanup(stream, compressor); - } - } - }; - public static final int DEFAULT_COMPRESSION_LEVEL = 6; public static final int DEFAULT_WINDOW_BITS = 15; public static final int DEFAULT_MEM_LEVEL = 8; @@ -55,6 +45,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE private final int compressionLevel; private final int windowBits; private final int memLevel; + private final Http2Connection.PropertyKey propertyKey; public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) { this(delegate, DEFAULT_COMPRESSION_LEVEL, DEFAULT_WINDOW_BITS, DEFAULT_MEM_LEVEL); @@ -76,15 +67,23 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE this.windowBits = windowBits; this.memLevel = memLevel; - connection().addListener(CLEAN_UP_LISTENER); + propertyKey = connection().newKey(); + connection().addListener(new Http2ConnectionAdapter() { + @Override + public void onStreamRemoved(Http2Stream stream) { + final EmbeddedChannel compressor = stream.getProperty(propertyKey); + if (compressor != null) { + cleanup(stream, compressor); + } + } + }); } @Override public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding, final boolean endOfStream, ChannelPromise promise) { final Http2Stream stream = connection().stream(streamId); - final EmbeddedChannel channel = stream == null ? null : - (EmbeddedChannel) stream.getProperty(CompressorHttp2ConnectionEncoder.class); + final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey); if (channel == null) { // The compressor may be null if no compatible encoding type was found in this stream's headers return super.writeData(ctx, streamId, data, padding, endOfStream, promise); @@ -216,7 +215,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE return; } - EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class); + EmbeddedChannel compressor = stream.getProperty(propertyKey); if (compressor == null) { if (!endOfStream) { ByteString encoding = headers.get(CONTENT_ENCODING); @@ -226,7 +225,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE try { compressor = newContentCompressor(encoding); if (compressor != null) { - stream.setProperty(CompressorHttp2ConnectionEncoder.class, compressor); + stream.setProperty(propertyKey, compressor); ByteString targetContentEncoding = getTargetContentEncoding(encoding); if (IDENTITY.equals(targetContentEncoding)) { headers.remove(CONTENT_ENCODING); @@ -256,7 +255,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE * @param stream The stream for which {@code compressor} is the compressor for * @param compressor The compressor for {@code stream} */ - private static void cleanup(Http2Stream stream, EmbeddedChannel compressor) { + void cleanup(Http2Stream stream, EmbeddedChannel compressor) { if (compressor.finish()) { for (;;) { final ByteBuf buf = compressor.readOutbound(); @@ -267,7 +266,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE buf.release(); } } - stream.removeProperty(CompressorHttp2ConnectionEncoder.class); + stream.removeProperty(propertyKey); } /** diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index 257471307b..8cc03183ea 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -37,6 +37,7 @@ import io.netty.handler.codec.http2.Http2Stream.State; import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.PrimitiveCollections; +import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -44,11 +45,10 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Arrays; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; -import java.util.Map; import java.util.Queue; import java.util.Set; @@ -59,9 +59,11 @@ public class DefaultHttp2Connection implements Http2Connection { private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2Connection.class); // Fields accessed by inner classes final IntObjectMap streamMap = new IntObjectHashMap(); + final PropertyKeyRegistry propertyKeyRegistry = new PropertyKeyRegistry(); final ConnectionStream connectionStream = new ConnectionStream(); final DefaultEndpoint localEndpoint; final DefaultEndpoint remoteEndpoint; + /** * The initial size of the children map is chosen to be conservative on initial memory allocations under * the assumption that most streams will have a small number of children. This choice may be @@ -265,11 +267,28 @@ public class DefaultHttp2Connection implements Http2Connection { } } + @Override + public PropertyKey newKey() { + return propertyKeyRegistry.newKey(); + } + + /** + * Verifies that the key is valid and returns it as the internal {@link DefaultPropertyKey} type. + * + * @throws NullPointerException if the key is {@code null}. + * @throws ClassCastException if the key is not of type {@link DefaultPropertyKey}. + * @throws IllegalArgumentException if the key was not created by this connection. + */ + final DefaultPropertyKey verifyKey(PropertyKey key) { + return checkNotNull((DefaultPropertyKey) key, "key").verifyConnection(this); + } + /** * Simple stream implementation. Streams can be compared to each other by priority. */ private class DefaultStream implements Http2Stream { private final int id; + private final PropertyMap properties = new PropertyMap(); private State state; private short weight = DEFAULT_PRIORITY_WEIGHT; private DefaultStream parent; @@ -277,14 +296,10 @@ public class DefaultHttp2Connection implements Http2Connection { private int totalChildWeights; private int prioritizableForTree = 1; private boolean resetSent; - private PropertyMap data; - private FlowControlState localFlowState; - private FlowControlState remoteFlowState; DefaultStream(int id, State state) { this.id = id; this.state = state; - data = new LazyPropertyMap(this); } @Override @@ -297,26 +312,6 @@ public class DefaultHttp2Connection implements Http2Connection { return state; } - @Override - public FlowControlState localFlowState() { - return localFlowState; - } - - @Override - public void localFlowState(FlowControlState state) { - localFlowState = state; - } - - @Override - public FlowControlState remoteFlowState() { - return remoteFlowState; - } - - @Override - public void remoteFlowState(FlowControlState state) { - remoteFlowState = state; - } - @Override public boolean isResetSent() { return resetSent; @@ -329,18 +324,18 @@ public class DefaultHttp2Connection implements Http2Connection { } @Override - public final Object setProperty(Object key, Object value) { - return data.put(key, value); + public final V setProperty(PropertyKey key, V value) { + return properties.add(verifyKey(key), value); } @Override - public final V getProperty(Object key) { - return data.get(key); + public final V getProperty(PropertyKey key) { + return properties.get(verifyKey(key)); } @Override - public final V removeProperty(Object key) { - return data.remove(key); + public final V removeProperty(PropertyKey key) { + return properties.remove(verifyKey(key)); } @Override @@ -694,74 +689,44 @@ public class DefaultHttp2Connection implements Http2Connection { } return false; } - } - /** - * Allows the data map to be lazily initialized for {@link DefaultStream}. - */ - private interface PropertyMap { - Object put(Object key, Object value); + /** + * Provides the lazy initialization for the {@link DefaultStream} data map. + */ + private class PropertyMap { + Object[] values = EmptyArrays.EMPTY_OBJECTS; - V get(Object key); + V add(DefaultPropertyKey key, V value) { + resizeIfNecessary(key.index); + @SuppressWarnings("unchecked") + V prevValue = (V) values[key.index]; + values[key.index] = value; + return prevValue; + } - V remove(Object key); - } + @SuppressWarnings("unchecked") + V get(DefaultPropertyKey key) { + if (key.index >= values.length) { + return null; + } + return (V) values[key.index]; + } - /** - * Provides actual {@link HashMap} functionality for {@link DefaultStream}'s application data. - */ - private static final class DefaultProperyMap implements PropertyMap { - private final Map data; + @SuppressWarnings("unchecked") + V remove(DefaultPropertyKey key) { + V prevValue = null; + if (key.index < values.length) { + prevValue = (V) values[key.index]; + values[key.index] = null; + } + return prevValue; + } - DefaultProperyMap(int initialSize) { - data = new HashMap(initialSize); - } - - @Override - public Object put(Object key, Object value) { - return data.put(key, value); - } - - @SuppressWarnings("unchecked") - @Override - public V get(Object key) { - return (V) data.get(key); - } - - @SuppressWarnings("unchecked") - @Override - public V remove(Object key) { - return (V) data.remove(key); - } - } - - /** - * Provides the lazy initialization for the {@link DefaultStream} data map. - */ - private static final class LazyPropertyMap implements PropertyMap { - private static final int DEFAULT_INITIAL_SIZE = 4; - private final DefaultStream stream; - - LazyPropertyMap(DefaultStream stream) { - this.stream = stream; - } - - @Override - public Object put(Object key, Object value) { - stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE); - return stream.data.put(key, value); - } - - @Override - public V get(Object key) { - stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE); - return stream.data.get(key); - } - - @Override - public V remove(Object key) { - stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE); - return stream.data.remove(key); + void resizeIfNecessary(int index) { + if (index >= values.length) { + values = Arrays.copyOf(values, propertyKeyRegistry.size()); + } + } } } @@ -1190,4 +1155,43 @@ public class DefaultHttp2Connection implements Http2Connection { return pendingIterations == 0; } } + + /** + * Implementation of {@link PropertyKey} that specifies the index position of the property. + */ + final class DefaultPropertyKey implements PropertyKey { + private final int index; + + DefaultPropertyKey(int index) { + this.index = index; + } + + DefaultPropertyKey verifyConnection(Http2Connection connection) { + if (connection != DefaultHttp2Connection.this) { + throw new IllegalArgumentException("Using a key that was not created by this connection"); + } + return this; + } + } + + /** + * A registry of all stream property keys known by this connection. + */ + private class PropertyKeyRegistry { + final List keys = new ArrayList(4); + + /** + * Registers a new property key. + */ + @SuppressWarnings("unchecked") + DefaultPropertyKey newKey() { + DefaultPropertyKey key = new DefaultPropertyKey(keys.size()); + keys.add(key); + return key; + } + + int size() { + return keys.size(); + } + } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java index b2fbb37037..9001145388 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java @@ -26,11 +26,11 @@ import static io.netty.handler.codec.http2.Http2Exception.streamError; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.lang.Math.max; import static java.lang.Math.min; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException; import io.netty.handler.codec.http2.Http2Exception.StreamException; -import io.netty.handler.codec.http2.Http2Stream.FlowControlState; import io.netty.util.internal.PlatformDependent; /** @@ -45,6 +45,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController private final Http2Connection connection; private final Http2FrameWriter frameWriter; + private final Http2Connection.PropertyKey stateKey; private ChannelHandlerContext ctx; private volatile float windowUpdateRatio; private volatile int initialWindowSize = DEFAULT_WINDOW_SIZE; @@ -60,8 +61,9 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController windowUpdateRatio(windowUpdateRatio); // Add a flow state for the connection. - connection.connectionStream().localFlowState( - new DefaultState(connection.connectionStream(), initialWindowSize)); + stateKey = connection.newKey(); + connection.connectionStream() + .setProperty(stateKey, new DefaultState(connection.connectionStream(), initialWindowSize)); // Register for notification of new streams. connection.addListener(new Http2ConnectionAdapter() { @@ -69,14 +71,14 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController public void onStreamAdded(Http2Stream stream) { // Unconditionally used the reduced flow control state because it requires no object allocation // and the DefaultFlowState will be allocated in onStreamActive. - stream.localFlowState(REDUCED_FLOW_STATE); + stream.setProperty(stateKey, REDUCED_FLOW_STATE); } @Override public void onStreamActive(Http2Stream stream) { // Need to be sure the stream's initial window is adjusted for SETTINGS // frames which may have been exchanged while it was in IDLE - stream.localFlowState(new DefaultState(stream, initialWindowSize)); + stream.setProperty(stateKey, new DefaultState(stream, initialWindowSize)); } @Override @@ -96,7 +98,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController // Unconditionally reduce the amount of memory required for flow control because there is no // object allocation costs associated with doing so and the stream will not have any more // local flow control state to keep track of anymore. - stream.localFlowState(REDUCED_FLOW_STATE); + stream.setProperty(stateKey, REDUCED_FLOW_STATE); } } }); @@ -117,6 +119,16 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController return initialWindowSize; } + @Override + public int windowSize(Http2Stream stream) { + return state(stream).windowSize(); + } + + @Override + public int initialWindowSize(Http2Stream stream) { + return state(stream).initialWindowSize(); + } + @Override public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception { checkNotNull(ctx, "ctx"); @@ -230,11 +242,12 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController } private FlowState connectionState() { - return (FlowState) connection.connectionStream().localFlowState(); + return connection.connectionStream().getProperty(stateKey); } - private static FlowState state(Http2Stream stream) { - return (FlowState) checkNotNull(stream, "stream").localFlowState(); + private FlowState state(Http2Stream stream) { + checkNotNull(stream, "stream"); + return stream.getProperty(stateKey); } private static boolean isClosed(Http2Stream stream) { @@ -352,8 +365,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController } } - @Override - public void returnProcessedBytes(int delta) throws Http2Exception { + private void returnProcessedBytes(int delta) throws Http2Exception { if (processedWindow - delta < window) { throw streamError(stream.id(), INTERNAL_ERROR, "Attempting to return too many bytes for stream %d", stream.id()); @@ -410,6 +422,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController * be exchanged. */ private static final FlowState REDUCED_FLOW_STATE = new FlowState() { + @Override public int windowSize() { return 0; @@ -466,11 +479,6 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController // the peer has not yet acknowledged this peer being activated. } - @Override - public void returnProcessedBytes(int delta) throws Http2Exception { - throw new UnsupportedOperationException(); - } - @Override public void endOfStream(boolean endOfStream) { throw new UnsupportedOperationException(); @@ -478,9 +486,14 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController }; /** - * An abstraction around {@link FlowControlState} which provides specific extensions used by local flow control. + * An abstraction which provides specific extensions used by local flow control. */ - private interface FlowState extends FlowControlState { + private interface FlowState { + + int windowSize(); + + int initialWindowSize(); + void window(int initialWindowSize); /** @@ -516,18 +529,13 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController */ void incrementFlowControlWindows(int delta) throws Http2Exception; - /** - * Returns the processed bytes for this stream. - */ - void returnProcessedBytes(int delta) throws Http2Exception; - void endOfStream(boolean endOfStream); } /** * Provides a means to iterate over all active streams and increment the flow control windows. */ - private static final class WindowUpdateVisitor implements Http2StreamVisitor { + private final class WindowUpdateVisitor implements Http2StreamVisitor { private CompositeStreamException compositeException; private final int delta; diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index b9eebe859d..5e15a83a93 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -18,13 +18,13 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; -import static io.netty.handler.codec.http2.Http2Stream.State.IDLE; import static io.netty.handler.codec.http2.Http2Exception.streamError; +import static io.netty.handler.codec.http2.Http2Stream.State.IDLE; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.lang.Math.max; import static java.lang.Math.min; + import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http2.Http2Stream.FlowControlState; import io.netty.handler.codec.http2.Http2Stream.State; import java.util.ArrayDeque; @@ -35,7 +35,7 @@ import java.util.Deque; * Basic implementation of {@link Http2RemoteFlowController}. */ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController { - private static final Http2StreamVisitor WRITE_ALLOCATED_BYTES = new Http2StreamVisitor() { + private final Http2StreamVisitor WRITE_ALLOCATED_BYTES = new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) { state(stream).writeAllocatedBytes(); @@ -43,6 +43,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } }; private final Http2Connection connection; + private final Http2Connection.PropertyKey stateKey; private int initialWindowSize = DEFAULT_WINDOW_SIZE; private ChannelHandlerContext ctx; private boolean needFlush; @@ -51,7 +52,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll this.connection = checkNotNull(connection, "connection"); // Add a flow state for the connection. - connection.connectionStream().remoteFlowState( + stateKey = connection.newKey(); + connection.connectionStream().setProperty(stateKey, new DefaultState(connection.connectionStream(), initialWindowSize)); // Register for notification of new streams. @@ -60,7 +62,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll public void onStreamAdded(Http2Stream stream) { // If the stream state is not open then the stream is not yet eligible for flow controlled frames and // only requires the ReducedFlowState. Otherwise the full amount of memory is required. - stream.remoteFlowState(stream.state() == IDLE ? + stream.setProperty(stateKey, stream.state() == IDLE ? new ReducedState(stream) : new DefaultState(stream, 0)); } @@ -69,10 +71,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll public void onStreamActive(Http2Stream stream) { // If the object was previously created, but later activated then we have to ensure // the full state is allocated and the proper initialWindowSize is used. - if (stream.remoteFlowState().getClass() == DefaultState.class) { - state(stream).window(initialWindowSize); + AbstractState state = state(stream); + if (state.getClass() == DefaultState.class) { + state.window(initialWindowSize); } else { - stream.remoteFlowState(new DefaultState(state(stream), initialWindowSize)); + stream.setProperty(stateKey, new DefaultState(state, initialWindowSize)); } } @@ -87,7 +90,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll // decrease the amount of memory required for this stream because no flow controlled frames can // be exchanged on this stream if (stream.prioritizableForTree() != 0) { - stream.remoteFlowState(new ReducedState(state)); + stream.setProperty(stateKey, new ReducedState(state)); } } @@ -161,6 +164,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return initialWindowSize; } + @Override + public int windowSize(Http2Stream stream) { + return state(stream).windowSize(); + } + + @Override + public int initialWindowSize(Http2Stream stream) { + return state(stream).initialWindowSize(); + } + @Override public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception { if (stream.id() == CONNECTION_STREAM_ID) { @@ -209,12 +222,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return state(stream).streamableBytesForTree(); } - private static AbstractState state(Http2Stream stream) { - return (AbstractState) checkNotNull(stream, "stream").remoteFlowState(); + private AbstractState state(Http2Stream stream) { + return (AbstractState) checkNotNull(stream, "stream").getProperty(stateKey); } private AbstractState connectionState() { - return (AbstractState) connection.connectionStream().remoteFlowState(); + return (AbstractState) connection.connectionStream().getProperty(stateKey); } /** @@ -261,7 +274,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * @param connectionWindowSize The connection window this is available for use at this point in the tree. * @return An object summarizing the write and allocation results. */ - static int allocateBytesForTree(Http2Stream parent, int connectionWindowSize) throws Http2Exception { + int allocateBytesForTree(Http2Stream parent, int connectionWindowSize) throws Http2Exception { AbstractState state = state(parent); if (state.streamableBytesForTree() <= 0) { return 0; @@ -289,7 +302,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * A {@link Http2StreamVisitor} that performs the HTTP/2 priority algorithm to distribute the available connection * window appropriately to the children of a given stream. */ - private static final class ChildFeeder implements Http2StreamVisitor { + private final class ChildFeeder implements Http2StreamVisitor { final int maxSize; int totalWeight; int connectionWindow; @@ -399,7 +412,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * A simplified version of {@link ChildFeeder} that is only used when all streamable bytes fit within the * available connection window. */ - private static final class SimpleChildFeeder implements Http2StreamVisitor { + private final class SimpleChildFeeder implements Http2StreamVisitor { int bytesAllocated; int connectionWindow; @@ -437,35 +450,35 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll // Set to true if cancel() was called. private boolean cancelled; - public DefaultState(Http2Stream stream, int initialWindowSize) { + DefaultState(Http2Stream stream, int initialWindowSize) { super(stream); window(initialWindowSize); pendingWriteQueue = new ArrayDeque(2); } - public DefaultState(AbstractState existingState, int initialWindowSize) { + DefaultState(AbstractState existingState, int initialWindowSize) { super(existingState); window(initialWindowSize); pendingWriteQueue = new ArrayDeque(2); } @Override - public int windowSize() { + int windowSize() { return window; } @Override - public int initialWindowSize() { + int initialWindowSize() { return initialWindowSize; } @Override - public void window(int initialWindowSize) { + void window(int initialWindowSize) { window = initialWindowSize; } @Override - public void allocate(int bytes) { + void allocate(int bytes) { allocated += bytes; // Also artificially reduce the streamable bytes for this tree to give the appearance // that the data has been written. This will be restored before the allocated bytes are @@ -474,7 +487,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - public void writeAllocatedBytes() { + void writeAllocatedBytes() { int numBytes = allocated; // Restore the number of streamable bytes to this branch. @@ -493,7 +506,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - public int incrementStreamWindow(int delta) throws Http2Exception { + int incrementStreamWindow(int delta) throws Http2Exception { if (delta > 0 && Integer.MAX_VALUE - delta < window) { throw streamError(stream.id(), FLOW_CONTROL_ERROR, "Window size overflow for stream: %d", stream.id()); @@ -510,28 +523,28 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - public int writableWindow() { + int writableWindow() { return min(window, connectionWindowSize()); } @Override - public int streamableBytes() { + int streamableBytes() { return max(0, min(pendingBytes - allocated, window)); } @Override - public int streamableBytesForTree() { + int streamableBytesForTree() { return streamableBytesForTree; } @Override - public void enqueueFrame(FlowControlled frame) { + void enqueueFrame(FlowControlled frame) { incrementPendingBytes(frame.size()); pendingWriteQueue.offer(frame); } @Override - public boolean hasFrame() { + boolean hasFrame() { return !pendingWriteQueue.isEmpty(); } @@ -543,7 +556,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - public void cancel() { + void cancel() { cancel(null); } @@ -568,7 +581,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - public int writeBytes(int bytes) { + int writeBytes(int bytes) { int bytesAttempted = 0; while (hasFrame()) { int maxBytes = min(bytes - bytesAttempted, writableWindow()); @@ -683,94 +696,94 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * The remote flow control state for a single stream that is not in a state where flow controlled frames cannot * be exchanged. */ - private static final class ReducedState extends AbstractState { - public ReducedState(Http2Stream stream) { + private final class ReducedState extends AbstractState { + ReducedState(Http2Stream stream) { super(stream); } - public ReducedState(AbstractState existingState) { + ReducedState(AbstractState existingState) { super(existingState); } @Override - public int windowSize() { + int windowSize() { return 0; } @Override - public int initialWindowSize() { + int initialWindowSize() { return 0; } @Override - public int writableWindow() { + int writableWindow() { return 0; } @Override - public int streamableBytes() { + int streamableBytes() { return 0; } @Override - public int streamableBytesForTree() { + int streamableBytesForTree() { return streamableBytesForTree; } @Override - public void writeAllocatedBytes() { + void writeAllocatedBytes() { throw new UnsupportedOperationException(); } @Override - public void cancel() { + void cancel() { } @Override - public void window(int initialWindowSize) { + void window(int initialWindowSize) { throw new UnsupportedOperationException(); } @Override - public int incrementStreamWindow(int delta) throws Http2Exception { + int incrementStreamWindow(int delta) throws Http2Exception { // This operation needs to be supported during the initial settings exchange when // the peer has not yet acknowledged this peer being activated. return 0; } @Override - public int writeBytes(int bytes) { + int writeBytes(int bytes) { throw new UnsupportedOperationException(); } @Override - public void enqueueFrame(FlowControlled frame) { + void enqueueFrame(FlowControlled frame) { throw new UnsupportedOperationException(); } @Override - public void allocate(int bytes) { + void allocate(int bytes) { throw new UnsupportedOperationException(); } @Override - public boolean hasFrame() { + boolean hasFrame() { return false; } } /** - * An abstraction around {@link FlowControlState} which provides specific extensions used by remote flow control. + * An abstraction which provides specific extensions used by remote flow control. */ - private abstract static class AbstractState implements FlowControlState { + private abstract class AbstractState { protected final Http2Stream stream; protected int streamableBytesForTree; - public AbstractState(Http2Stream stream) { + AbstractState(Http2Stream stream) { this.stream = stream; } - public AbstractState(AbstractState existingState) { + AbstractState(AbstractState existingState) { this.stream = existingState.stream(); this.streamableBytesForTree = existingState.streamableBytesForTree(); } @@ -778,7 +791,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll /** * The stream this state is associated with. */ - public final Http2Stream stream() { + final Http2Stream stream() { return stream; } @@ -786,72 +799,76 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * Recursively increments the {@link #streamableBytesForTree()} for this branch in the priority tree starting * at the current node. */ - public final void incrementStreamableBytesForTree(int numBytes) { + final void incrementStreamableBytesForTree(int numBytes) { streamableBytesForTree += numBytes; if (!stream.isRoot()) { state(stream.parent()).incrementStreamableBytesForTree(numBytes); } } + abstract int windowSize(); + + abstract int initialWindowSize(); + /** * Write bytes allocated bytes for this stream. */ - public abstract void writeAllocatedBytes(); + abstract void writeAllocatedBytes(); /** * Returns the number of pending bytes for this node that will fit within the - * {@link #availableWindow()}. This is used for the priority algorithm to determine the aggregate + * {@link #writableWindow()}. This is used for the priority algorithm to determine the aggregate * number of bytes that can be written at each node. Each node only takes into account its * stream window so that when a change occurs to the connection window, these values need * not change (i.e. no tree traversal is required). */ - public abstract int streamableBytes(); + abstract int streamableBytes(); /** * Get the {@link #streamableBytes()} for the entire tree rooted at this node. */ - public abstract int streamableBytesForTree(); + abstract int streamableBytesForTree(); /** * Any operations that may be pending are cleared and the status of these operations is failed. */ - public abstract void cancel(); + abstract void cancel(); /** - * Reset the {@link #availableWindow()} size. + * Reset the window size for this stream. */ - public abstract void window(int initialWindowSize); + abstract void window(int initialWindowSize); /** * Increments the flow control window for this stream by the given delta and returns the new value. */ - public abstract int incrementStreamWindow(int delta) throws Http2Exception; + abstract int incrementStreamWindow(int delta) throws Http2Exception; /** * Returns the maximum writable window (minimum of the stream and connection windows). */ - public abstract int writableWindow(); + abstract int writableWindow(); /** * Writes up to the number of bytes from the pending queue. May write less if limited by the writable window, by * the number of pending writes available, or because a frame does not support splitting on arbitrary * boundaries. */ - public abstract int writeBytes(int bytes); + abstract int writeBytes(int bytes); /** * Adds the {@code frame} to the pending queue and increments the pending byte count. */ - public abstract void enqueueFrame(FlowControlled frame); + abstract void enqueueFrame(FlowControlled frame); /** * Increment the number of bytes allocated to this stream by the priority algorithm */ - public abstract void allocate(int bytes); + abstract void allocate(int bytes); /** * Indicates whether or not there are frames in the pending queue. */ - public abstract boolean hasFrame(); + abstract boolean hasFrame(); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java index c29aa41431..2fd5c3f798 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java @@ -24,6 +24,7 @@ import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.streamError; import static io.netty.util.internal.ObjectUtil.checkNotNull; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; @@ -38,19 +39,11 @@ import io.netty.util.ByteString; * stream. The decompression provided by this class will be applied to the data for the entire stream. */ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecorator { - private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() { - @Override - public void onStreamRemoved(Http2Stream stream) { - final Http2Decompressor decompressor = decompressor(stream); - if (decompressor != null) { - cleanup(stream, decompressor); - } - } - }; private final Http2Connection connection; private final boolean strict; private boolean flowControllerInitialized; + final Http2Connection.PropertyKey propertyKey; public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener) { this(connection, listener, true); @@ -62,7 +55,16 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor this.connection = connection; this.strict = strict; - connection.addListener(CLEAN_UP_LISTENER); + propertyKey = connection.newKey(); + connection.addListener(new Http2ConnectionAdapter() { + @Override + public void onStreamRemoved(Http2Stream stream) { + final Http2Decompressor decompressor = decompressor(stream); + if (decompressor != null) { + cleanup(stream, decompressor); + } + } + }); } @Override @@ -210,7 +212,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor final EmbeddedChannel channel = newContentDecompressor(contentEncoding); if (channel != null) { decompressor = new Http2Decompressor(channel); - stream.setProperty(Http2Decompressor.class, decompressor); + stream.setProperty(propertyKey, decompressor); // Decode the content and remove or replace the existing headers // so that the message looks like a decoded message. ByteString targetContentEncoding = getTargetContentEncoding(contentEncoding); @@ -237,8 +239,8 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor } } - private static Http2Decompressor decompressor(Http2Stream stream) { - return (Http2Decompressor) (stream == null? null : stream.getProperty(Http2Decompressor.class)); + Http2Decompressor decompressor(Http2Stream stream) { + return stream == null ? null : (Http2Decompressor) stream.getProperty(propertyKey); } /** @@ -248,7 +250,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor * @param stream The stream for which {@code decompressor} is the decompressor for * @param decompressor The decompressor for {@code stream} */ - private static void cleanup(Http2Stream stream, Http2Decompressor decompressor) { + private void cleanup(Http2Stream stream, Http2Decompressor decompressor) { final EmbeddedChannel channel = decompressor.decompressor(); if (channel.finish()) { for (;;) { @@ -259,7 +261,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor buf.release(); } } - decompressor = stream.removeProperty(Http2Decompressor.class); + decompressor = stream.removeProperty(propertyKey); } /** @@ -286,7 +288,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor /** * A decorator around the local flow controller that converts consumed bytes from uncompressed to compressed. */ - private static final class ConsumedBytesConverter implements Http2LocalFlowController { + private final class ConsumedBytesConverter implements Http2LocalFlowController { private final Http2LocalFlowController flowController; ConsumedBytesConverter(Http2LocalFlowController flowController) { @@ -303,6 +305,16 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor return flowController.initialWindowSize(); } + @Override + public int windowSize(Http2Stream stream) { + return flowController.windowSize(stream); + } + + @Override + public int initialWindowSize(Http2Stream stream) { + return flowController.initialWindowSize(stream); + } + @Override public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception { @@ -330,12 +342,12 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor flowController.consumeBytes(ctx, stream, numBytes); } catch (Http2Exception e) { if (copy != null) { - stream.setProperty(Http2Decompressor.class, copy); + stream.setProperty(propertyKey, copy); } throw e; } catch (Throwable t) { if (copy != null) { - stream.setProperty(Http2Decompressor.class, copy); + stream.setProperty(propertyKey, copy); } throw new Http2Exception(INTERNAL_ERROR, "Error while returning bytes to flow control window", t); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java index a71f36df64..0acd1146a5 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java @@ -278,6 +278,17 @@ public interface Http2Connection { Endpoint opposite(); } + /** + * A key to be used for associating application-defined properties with streams within this connection. + */ + interface PropertyKey { + } + + /** + * Creates a new key that is unique within this {@link Http2Connection}. + */ + PropertyKey newKey(); + /** * Adds a listener of stream life-cycle events. */ diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowController.java index ed4a5c453e..7f2a1814c3 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowController.java @@ -22,8 +22,8 @@ import io.netty.channel.ChannelHandlerContext; public interface Http2FlowController { /** - * Sets the initial flow control window and updates all stream windows (but not the connection - * window) by the delta. + * Sets the connection-wide initial flow control window and updates all stream windows (but not the connection + * stream window) by the delta. *

* This method is used to apply the {@code SETTINGS_INITIAL_WINDOW_SIZE} value for an * {@code SETTINGS} frame. @@ -34,11 +34,24 @@ public interface Http2FlowController { void initialWindowSize(int newWindowSize) throws Http2Exception; /** - * Gets the initial flow control window size that is used as the basis for new stream flow + * Gets the connection-wide initial flow control window size that is used as the basis for new stream flow * control windows. */ int initialWindowSize(); + /** + * Get the portion of the flow control window for the given stream that is currently available for sending/receiving + * frames which are subject to flow control. This quantity is measured in number of bytes. + */ + int windowSize(Http2Stream stream); + + /** + * Get the initial flow control window size for the given stream. This quantity is measured in number of bytes. Note + * the unavailable window portion can be calculated by {@link #initialWindowSize()} - {@link + * #windowSize(Http2Stream)}. + */ + int initialWindowSize(Http2Stream stream); + /** * Increments the size of the stream's flow control window by the given delta. *

diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java index 30b860f3ca..5916f28782 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java @@ -33,24 +33,6 @@ public interface Http2Stream { CLOSED } - /** - * Represents the state which flow controller implementations are expected to track. - */ - interface FlowControlState { - /** - * Get the portion of the flow control window that is available for sending/receiving frames which are subject - * to flow control. This quantity is measured in number of bytes. - */ - int windowSize(); - - /** - * Get the initial flow control window size. This quantity is measured in number of bytes. - * Note the unavailable window portion can be calculated by - * {@link #initialWindowSize()} - {@link #windowSize()}. - */ - int initialWindowSize(); - } - /** * Gets the unique identifier for this stream within the connection. */ @@ -61,26 +43,6 @@ public interface Http2Stream { */ State state(); - /** - * Get the state as related to the {@link Http2LocalFlowController}. - */ - FlowControlState localFlowState(); - - /** - * Set the state as related to the {@link Http2LocalFlowController}. - */ - void localFlowState(FlowControlState state); - - /** - * Get the state as related to {@link Http2RemoteFlowController}. - */ - FlowControlState remoteFlowState(); - - /** - * Set the state as related to {@link Http2RemoteFlowController}. - */ - void remoteFlowState(FlowControlState state); - /** * Opens this stream, making it available via {@link Http2Connection#forEachActiveStream(Http2StreamVisitor)} and * transition state to: @@ -140,17 +102,17 @@ public interface Http2Stream { * Associates the application-defined data with this stream. * @return The value that was previously associated with {@code key}, or {@code null} if there was none. */ - Object setProperty(Object key, Object value); + V setProperty(Http2Connection.PropertyKey key, V value); /** * Returns application-defined data if any was associated with this stream. */ - V getProperty(Object key); + V getProperty(Http2Connection.PropertyKey key); /** * Returns and removes application-defined data if any was associated with this stream. */ - V removeProperty(Object key); + V removeProperty(Http2Connection.PropertyKey key); /** * Updates an priority for this stream. Calling this method may affect the straucture of the diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java index d1e8296cef..30400d72ee 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java @@ -67,6 +67,7 @@ public class DefaultHttp2LocalFlowControllerTest { connection = new DefaultHttp2Connection(false); controller = new DefaultHttp2LocalFlowController(connection, frameWriter, updateRatio); + connection.local().flowController(controller); connection.local().createStream(STREAM_ID, false); } @@ -320,7 +321,7 @@ public class DefaultHttp2LocalFlowControllerTest { } private int window(int streamId) throws Http2Exception { - return stream(streamId).localFlowState().windowSize(); + return controller.windowSize(stream(streamId)); } private Http2Stream stream(int streamId) { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java index bcc3dda5e0..b69b3bcdac 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java @@ -86,6 +86,7 @@ public class DefaultHttp2RemoteFlowControllerTest { connection = new DefaultHttp2Connection(false); controller = new DefaultHttp2RemoteFlowController(connection); + connection.remote().flowController(controller); connection.local().createStream(STREAM_A, false); connection.local().createStream(STREAM_B, false); @@ -1222,7 +1223,7 @@ public class DefaultHttp2RemoteFlowControllerTest { } private int window(int streamId) throws Http2Exception { - return stream(streamId).remoteFlowState().windowSize(); + return controller.windowSize(stream(streamId)); } private void incrementWindowSize(int streamId, int delta) throws Http2Exception { diff --git a/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2LocalFlowController.java b/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2LocalFlowController.java index 92237eeeb0..816d4768fb 100644 --- a/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2LocalFlowController.java +++ b/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2LocalFlowController.java @@ -15,6 +15,7 @@ package io.netty.microbench.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2Exception; @@ -35,6 +36,16 @@ public final class NoopHttp2LocalFlowController implements Http2LocalFlowControl return MAX_INITIAL_WINDOW_SIZE; } + @Override + public int windowSize(Http2Stream stream) { + return MAX_INITIAL_WINDOW_SIZE; + } + + @Override + public int initialWindowSize(Http2Stream stream) { + return MAX_INITIAL_WINDOW_SIZE; + } + @Override public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception { diff --git a/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2RemoteFlowController.java b/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2RemoteFlowController.java index 1dbd6f55bc..94ccd6d25a 100644 --- a/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2RemoteFlowController.java +++ b/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2RemoteFlowController.java @@ -15,6 +15,7 @@ package io.netty.microbench.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE; + import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2RemoteFlowController; @@ -24,6 +25,7 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr public static final NoopHttp2RemoteFlowController INSTANCE = new NoopHttp2RemoteFlowController(); private NoopHttp2RemoteFlowController() { } + @Override public void initialWindowSize(int newWindowSize) throws Http2Exception { } @@ -33,6 +35,16 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr return MAX_INITIAL_WINDOW_SIZE; } + @Override + public int windowSize(Http2Stream stream) { + return MAX_INITIAL_WINDOW_SIZE; + } + + @Override + public int initialWindowSize(Http2Stream stream) { + return MAX_INITIAL_WINDOW_SIZE; + } + @Override public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {