Optimizing user-defined stream properties.
Motivation: Streams currently maintain a hash map of user-defined properties, which has been shown to add significant memory overhead as well as being a performance bottleneck for lookup of frequently used properties. Modifications: Modifying the connection/stream to use an array as the storage of user-defined properties, indexed by the class that identifies the index into the array where the property is stored. Result: Stream processing performance should be improved.
This commit is contained in:
parent
69558999eb
commit
c98195714d
@ -38,16 +38,6 @@ import io.netty.util.ByteString;
|
||||
* stream. The compression provided by this class will be applied to the data for the entire stream.
|
||||
*/
|
||||
public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
|
||||
private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
|
||||
@Override
|
||||
public void onStreamRemoved(Http2Stream stream) {
|
||||
final EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class);
|
||||
if (compressor != null) {
|
||||
cleanup(stream, compressor);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
public static final int DEFAULT_COMPRESSION_LEVEL = 6;
|
||||
public static final int DEFAULT_WINDOW_BITS = 15;
|
||||
public static final int DEFAULT_MEM_LEVEL = 8;
|
||||
@ -55,6 +45,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
|
||||
private final int compressionLevel;
|
||||
private final int windowBits;
|
||||
private final int memLevel;
|
||||
private final Http2Connection.PropertyKey propertyKey;
|
||||
|
||||
public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
|
||||
this(delegate, DEFAULT_COMPRESSION_LEVEL, DEFAULT_WINDOW_BITS, DEFAULT_MEM_LEVEL);
|
||||
@ -76,15 +67,23 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
|
||||
this.windowBits = windowBits;
|
||||
this.memLevel = memLevel;
|
||||
|
||||
connection().addListener(CLEAN_UP_LISTENER);
|
||||
propertyKey = connection().newKey();
|
||||
connection().addListener(new Http2ConnectionAdapter() {
|
||||
@Override
|
||||
public void onStreamRemoved(Http2Stream stream) {
|
||||
final EmbeddedChannel compressor = stream.getProperty(propertyKey);
|
||||
if (compressor != null) {
|
||||
cleanup(stream, compressor);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
|
||||
final boolean endOfStream, ChannelPromise promise) {
|
||||
final Http2Stream stream = connection().stream(streamId);
|
||||
final EmbeddedChannel channel = stream == null ? null :
|
||||
(EmbeddedChannel) stream.getProperty(CompressorHttp2ConnectionEncoder.class);
|
||||
final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
|
||||
if (channel == null) {
|
||||
// The compressor may be null if no compatible encoding type was found in this stream's headers
|
||||
return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
|
||||
@ -216,7 +215,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
|
||||
return;
|
||||
}
|
||||
|
||||
EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class);
|
||||
EmbeddedChannel compressor = stream.getProperty(propertyKey);
|
||||
if (compressor == null) {
|
||||
if (!endOfStream) {
|
||||
ByteString encoding = headers.get(CONTENT_ENCODING);
|
||||
@ -226,7 +225,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
|
||||
try {
|
||||
compressor = newContentCompressor(encoding);
|
||||
if (compressor != null) {
|
||||
stream.setProperty(CompressorHttp2ConnectionEncoder.class, compressor);
|
||||
stream.setProperty(propertyKey, compressor);
|
||||
ByteString targetContentEncoding = getTargetContentEncoding(encoding);
|
||||
if (IDENTITY.equals(targetContentEncoding)) {
|
||||
headers.remove(CONTENT_ENCODING);
|
||||
@ -256,7 +255,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
|
||||
* @param stream The stream for which {@code compressor} is the compressor for
|
||||
* @param compressor The compressor for {@code stream}
|
||||
*/
|
||||
private static void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
|
||||
void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
|
||||
if (compressor.finish()) {
|
||||
for (;;) {
|
||||
final ByteBuf buf = compressor.readOutbound();
|
||||
@ -267,7 +266,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
|
||||
buf.release();
|
||||
}
|
||||
}
|
||||
stream.removeProperty(CompressorHttp2ConnectionEncoder.class);
|
||||
stream.removeProperty(propertyKey);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -37,6 +37,7 @@ import io.netty.handler.codec.http2.Http2Stream.State;
|
||||
import io.netty.util.collection.IntObjectHashMap;
|
||||
import io.netty.util.collection.IntObjectMap;
|
||||
import io.netty.util.collection.PrimitiveCollections;
|
||||
import io.netty.util.internal.EmptyArrays;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.SystemPropertyUtil;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
@ -44,11 +45,10 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
|
||||
@ -59,9 +59,11 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2Connection.class);
|
||||
// Fields accessed by inner classes
|
||||
final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
|
||||
final PropertyKeyRegistry propertyKeyRegistry = new PropertyKeyRegistry();
|
||||
final ConnectionStream connectionStream = new ConnectionStream();
|
||||
final DefaultEndpoint<Http2LocalFlowController> localEndpoint;
|
||||
final DefaultEndpoint<Http2RemoteFlowController> remoteEndpoint;
|
||||
|
||||
/**
|
||||
* The initial size of the children map is chosen to be conservative on initial memory allocations under
|
||||
* the assumption that most streams will have a small number of children. This choice may be
|
||||
@ -265,11 +267,28 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public PropertyKey newKey() {
|
||||
return propertyKeyRegistry.newKey();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that the key is valid and returns it as the internal {@link DefaultPropertyKey} type.
|
||||
*
|
||||
* @throws NullPointerException if the key is {@code null}.
|
||||
* @throws ClassCastException if the key is not of type {@link DefaultPropertyKey}.
|
||||
* @throws IllegalArgumentException if the key was not created by this connection.
|
||||
*/
|
||||
final DefaultPropertyKey verifyKey(PropertyKey key) {
|
||||
return checkNotNull((DefaultPropertyKey) key, "key").verifyConnection(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple stream implementation. Streams can be compared to each other by priority.
|
||||
*/
|
||||
private class DefaultStream implements Http2Stream {
|
||||
private final int id;
|
||||
private final PropertyMap properties = new PropertyMap();
|
||||
private State state;
|
||||
private short weight = DEFAULT_PRIORITY_WEIGHT;
|
||||
private DefaultStream parent;
|
||||
@ -277,14 +296,10 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
private int totalChildWeights;
|
||||
private int prioritizableForTree = 1;
|
||||
private boolean resetSent;
|
||||
private PropertyMap data;
|
||||
private FlowControlState localFlowState;
|
||||
private FlowControlState remoteFlowState;
|
||||
|
||||
DefaultStream(int id, State state) {
|
||||
this.id = id;
|
||||
this.state = state;
|
||||
data = new LazyPropertyMap(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -297,26 +312,6 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
return state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlowControlState localFlowState() {
|
||||
return localFlowState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void localFlowState(FlowControlState state) {
|
||||
localFlowState = state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlowControlState remoteFlowState() {
|
||||
return remoteFlowState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remoteFlowState(FlowControlState state) {
|
||||
remoteFlowState = state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isResetSent() {
|
||||
return resetSent;
|
||||
@ -329,18 +324,18 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Object setProperty(Object key, Object value) {
|
||||
return data.put(key, value);
|
||||
public final <V> V setProperty(PropertyKey key, V value) {
|
||||
return properties.add(verifyKey(key), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final <V> V getProperty(Object key) {
|
||||
return data.get(key);
|
||||
public final <V> V getProperty(PropertyKey key) {
|
||||
return properties.get(verifyKey(key));
|
||||
}
|
||||
|
||||
@Override
|
||||
public final <V> V removeProperty(Object key) {
|
||||
return data.remove(key);
|
||||
public final <V> V removeProperty(PropertyKey key) {
|
||||
return properties.remove(verifyKey(key));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -694,74 +689,44 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows the data map to be lazily initialized for {@link DefaultStream}.
|
||||
*/
|
||||
private interface PropertyMap {
|
||||
Object put(Object key, Object value);
|
||||
/**
|
||||
* Provides the lazy initialization for the {@link DefaultStream} data map.
|
||||
*/
|
||||
private class PropertyMap {
|
||||
Object[] values = EmptyArrays.EMPTY_OBJECTS;
|
||||
|
||||
<V> V get(Object key);
|
||||
<V> V add(DefaultPropertyKey key, V value) {
|
||||
resizeIfNecessary(key.index);
|
||||
@SuppressWarnings("unchecked")
|
||||
V prevValue = (V) values[key.index];
|
||||
values[key.index] = value;
|
||||
return prevValue;
|
||||
}
|
||||
|
||||
<V> V remove(Object key);
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
<V> V get(DefaultPropertyKey key) {
|
||||
if (key.index >= values.length) {
|
||||
return null;
|
||||
}
|
||||
return (V) values[key.index];
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides actual {@link HashMap} functionality for {@link DefaultStream}'s application data.
|
||||
*/
|
||||
private static final class DefaultProperyMap implements PropertyMap {
|
||||
private final Map<Object, Object> data;
|
||||
@SuppressWarnings("unchecked")
|
||||
<V> V remove(DefaultPropertyKey key) {
|
||||
V prevValue = null;
|
||||
if (key.index < values.length) {
|
||||
prevValue = (V) values[key.index];
|
||||
values[key.index] = null;
|
||||
}
|
||||
return prevValue;
|
||||
}
|
||||
|
||||
DefaultProperyMap(int initialSize) {
|
||||
data = new HashMap<Object, Object>(initialSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object put(Object key, Object value) {
|
||||
return data.put(key, value);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <V> V get(Object key) {
|
||||
return (V) data.get(key);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <V> V remove(Object key) {
|
||||
return (V) data.remove(key);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides the lazy initialization for the {@link DefaultStream} data map.
|
||||
*/
|
||||
private static final class LazyPropertyMap implements PropertyMap {
|
||||
private static final int DEFAULT_INITIAL_SIZE = 4;
|
||||
private final DefaultStream stream;
|
||||
|
||||
LazyPropertyMap(DefaultStream stream) {
|
||||
this.stream = stream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object put(Object key, Object value) {
|
||||
stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE);
|
||||
return stream.data.put(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> V get(Object key) {
|
||||
stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE);
|
||||
return stream.data.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> V remove(Object key) {
|
||||
stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE);
|
||||
return stream.data.remove(key);
|
||||
void resizeIfNecessary(int index) {
|
||||
if (index >= values.length) {
|
||||
values = Arrays.copyOf(values, propertyKeyRegistry.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1190,4 +1155,43 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
return pendingIterations == 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of {@link PropertyKey} that specifies the index position of the property.
|
||||
*/
|
||||
final class DefaultPropertyKey implements PropertyKey {
|
||||
private final int index;
|
||||
|
||||
DefaultPropertyKey(int index) {
|
||||
this.index = index;
|
||||
}
|
||||
|
||||
DefaultPropertyKey verifyConnection(Http2Connection connection) {
|
||||
if (connection != DefaultHttp2Connection.this) {
|
||||
throw new IllegalArgumentException("Using a key that was not created by this connection");
|
||||
}
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A registry of all stream property keys known by this connection.
|
||||
*/
|
||||
private class PropertyKeyRegistry {
|
||||
final List<DefaultPropertyKey> keys = new ArrayList<DefaultPropertyKey>(4);
|
||||
|
||||
/**
|
||||
* Registers a new property key.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
DefaultPropertyKey newKey() {
|
||||
DefaultPropertyKey key = new DefaultPropertyKey(keys.size());
|
||||
keys.add(key);
|
||||
return key;
|
||||
}
|
||||
|
||||
int size() {
|
||||
return keys.size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -26,11 +26,11 @@ import static io.netty.handler.codec.http2.Http2Exception.streamError;
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
import static java.lang.Math.max;
|
||||
import static java.lang.Math.min;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
|
||||
import io.netty.handler.codec.http2.Http2Exception.StreamException;
|
||||
import io.netty.handler.codec.http2.Http2Stream.FlowControlState;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
/**
|
||||
@ -45,6 +45,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
|
||||
private final Http2Connection connection;
|
||||
private final Http2FrameWriter frameWriter;
|
||||
private final Http2Connection.PropertyKey stateKey;
|
||||
private ChannelHandlerContext ctx;
|
||||
private volatile float windowUpdateRatio;
|
||||
private volatile int initialWindowSize = DEFAULT_WINDOW_SIZE;
|
||||
@ -60,8 +61,9 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
windowUpdateRatio(windowUpdateRatio);
|
||||
|
||||
// Add a flow state for the connection.
|
||||
connection.connectionStream().localFlowState(
|
||||
new DefaultState(connection.connectionStream(), initialWindowSize));
|
||||
stateKey = connection.newKey();
|
||||
connection.connectionStream()
|
||||
.setProperty(stateKey, new DefaultState(connection.connectionStream(), initialWindowSize));
|
||||
|
||||
// Register for notification of new streams.
|
||||
connection.addListener(new Http2ConnectionAdapter() {
|
||||
@ -69,14 +71,14 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
public void onStreamAdded(Http2Stream stream) {
|
||||
// Unconditionally used the reduced flow control state because it requires no object allocation
|
||||
// and the DefaultFlowState will be allocated in onStreamActive.
|
||||
stream.localFlowState(REDUCED_FLOW_STATE);
|
||||
stream.setProperty(stateKey, REDUCED_FLOW_STATE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStreamActive(Http2Stream stream) {
|
||||
// Need to be sure the stream's initial window is adjusted for SETTINGS
|
||||
// frames which may have been exchanged while it was in IDLE
|
||||
stream.localFlowState(new DefaultState(stream, initialWindowSize));
|
||||
stream.setProperty(stateKey, new DefaultState(stream, initialWindowSize));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -96,7 +98,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
// Unconditionally reduce the amount of memory required for flow control because there is no
|
||||
// object allocation costs associated with doing so and the stream will not have any more
|
||||
// local flow control state to keep track of anymore.
|
||||
stream.localFlowState(REDUCED_FLOW_STATE);
|
||||
stream.setProperty(stateKey, REDUCED_FLOW_STATE);
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -117,6 +119,16 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
return initialWindowSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int windowSize(Http2Stream stream) {
|
||||
return state(stream).windowSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int initialWindowSize(Http2Stream stream) {
|
||||
return state(stream).initialWindowSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
|
||||
checkNotNull(ctx, "ctx");
|
||||
@ -230,11 +242,12 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
}
|
||||
|
||||
private FlowState connectionState() {
|
||||
return (FlowState) connection.connectionStream().localFlowState();
|
||||
return connection.connectionStream().getProperty(stateKey);
|
||||
}
|
||||
|
||||
private static FlowState state(Http2Stream stream) {
|
||||
return (FlowState) checkNotNull(stream, "stream").localFlowState();
|
||||
private FlowState state(Http2Stream stream) {
|
||||
checkNotNull(stream, "stream");
|
||||
return stream.getProperty(stateKey);
|
||||
}
|
||||
|
||||
private static boolean isClosed(Http2Stream stream) {
|
||||
@ -352,8 +365,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void returnProcessedBytes(int delta) throws Http2Exception {
|
||||
private void returnProcessedBytes(int delta) throws Http2Exception {
|
||||
if (processedWindow - delta < window) {
|
||||
throw streamError(stream.id(), INTERNAL_ERROR,
|
||||
"Attempting to return too many bytes for stream %d", stream.id());
|
||||
@ -410,6 +422,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
* be exchanged.
|
||||
*/
|
||||
private static final FlowState REDUCED_FLOW_STATE = new FlowState() {
|
||||
|
||||
@Override
|
||||
public int windowSize() {
|
||||
return 0;
|
||||
@ -466,11 +479,6 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
// the peer has not yet acknowledged this peer being activated.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void returnProcessedBytes(int delta) throws Http2Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void endOfStream(boolean endOfStream) {
|
||||
throw new UnsupportedOperationException();
|
||||
@ -478,9 +486,14 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
};
|
||||
|
||||
/**
|
||||
* An abstraction around {@link FlowControlState} which provides specific extensions used by local flow control.
|
||||
* An abstraction which provides specific extensions used by local flow control.
|
||||
*/
|
||||
private interface FlowState extends FlowControlState {
|
||||
private interface FlowState {
|
||||
|
||||
int windowSize();
|
||||
|
||||
int initialWindowSize();
|
||||
|
||||
void window(int initialWindowSize);
|
||||
|
||||
/**
|
||||
@ -516,18 +529,13 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
*/
|
||||
void incrementFlowControlWindows(int delta) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Returns the processed bytes for this stream.
|
||||
*/
|
||||
void returnProcessedBytes(int delta) throws Http2Exception;
|
||||
|
||||
void endOfStream(boolean endOfStream);
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides a means to iterate over all active streams and increment the flow control windows.
|
||||
*/
|
||||
private static final class WindowUpdateVisitor implements Http2StreamVisitor {
|
||||
private final class WindowUpdateVisitor implements Http2StreamVisitor {
|
||||
private CompositeStreamException compositeException;
|
||||
private final int delta;
|
||||
|
||||
|
@ -18,13 +18,13 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
|
||||
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.streamError;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
import static java.lang.Math.max;
|
||||
import static java.lang.Math.min;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.http2.Http2Stream.FlowControlState;
|
||||
import io.netty.handler.codec.http2.Http2Stream.State;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
@ -35,7 +35,7 @@ import java.util.Deque;
|
||||
* Basic implementation of {@link Http2RemoteFlowController}.
|
||||
*/
|
||||
public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
|
||||
private static final Http2StreamVisitor WRITE_ALLOCATED_BYTES = new Http2StreamVisitor() {
|
||||
private final Http2StreamVisitor WRITE_ALLOCATED_BYTES = new Http2StreamVisitor() {
|
||||
@Override
|
||||
public boolean visit(Http2Stream stream) {
|
||||
state(stream).writeAllocatedBytes();
|
||||
@ -43,6 +43,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
};
|
||||
private final Http2Connection connection;
|
||||
private final Http2Connection.PropertyKey stateKey;
|
||||
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
|
||||
private ChannelHandlerContext ctx;
|
||||
private boolean needFlush;
|
||||
@ -51,7 +52,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
this.connection = checkNotNull(connection, "connection");
|
||||
|
||||
// Add a flow state for the connection.
|
||||
connection.connectionStream().remoteFlowState(
|
||||
stateKey = connection.newKey();
|
||||
connection.connectionStream().setProperty(stateKey,
|
||||
new DefaultState(connection.connectionStream(), initialWindowSize));
|
||||
|
||||
// Register for notification of new streams.
|
||||
@ -60,7 +62,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
public void onStreamAdded(Http2Stream stream) {
|
||||
// If the stream state is not open then the stream is not yet eligible for flow controlled frames and
|
||||
// only requires the ReducedFlowState. Otherwise the full amount of memory is required.
|
||||
stream.remoteFlowState(stream.state() == IDLE ?
|
||||
stream.setProperty(stateKey, stream.state() == IDLE ?
|
||||
new ReducedState(stream) :
|
||||
new DefaultState(stream, 0));
|
||||
}
|
||||
@ -69,10 +71,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
public void onStreamActive(Http2Stream stream) {
|
||||
// If the object was previously created, but later activated then we have to ensure
|
||||
// the full state is allocated and the proper initialWindowSize is used.
|
||||
if (stream.remoteFlowState().getClass() == DefaultState.class) {
|
||||
state(stream).window(initialWindowSize);
|
||||
AbstractState state = state(stream);
|
||||
if (state.getClass() == DefaultState.class) {
|
||||
state.window(initialWindowSize);
|
||||
} else {
|
||||
stream.remoteFlowState(new DefaultState(state(stream), initialWindowSize));
|
||||
stream.setProperty(stateKey, new DefaultState(state, initialWindowSize));
|
||||
}
|
||||
}
|
||||
|
||||
@ -87,7 +90,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
// decrease the amount of memory required for this stream because no flow controlled frames can
|
||||
// be exchanged on this stream
|
||||
if (stream.prioritizableForTree() != 0) {
|
||||
stream.remoteFlowState(new ReducedState(state));
|
||||
stream.setProperty(stateKey, new ReducedState(state));
|
||||
}
|
||||
}
|
||||
|
||||
@ -161,6 +164,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
return initialWindowSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int windowSize(Http2Stream stream) {
|
||||
return state(stream).windowSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int initialWindowSize(Http2Stream stream) {
|
||||
return state(stream).initialWindowSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
|
||||
if (stream.id() == CONNECTION_STREAM_ID) {
|
||||
@ -209,12 +222,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
return state(stream).streamableBytesForTree();
|
||||
}
|
||||
|
||||
private static AbstractState state(Http2Stream stream) {
|
||||
return (AbstractState) checkNotNull(stream, "stream").remoteFlowState();
|
||||
private AbstractState state(Http2Stream stream) {
|
||||
return (AbstractState) checkNotNull(stream, "stream").getProperty(stateKey);
|
||||
}
|
||||
|
||||
private AbstractState connectionState() {
|
||||
return (AbstractState) connection.connectionStream().remoteFlowState();
|
||||
return (AbstractState) connection.connectionStream().getProperty(stateKey);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -261,7 +274,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
* @param connectionWindowSize The connection window this is available for use at this point in the tree.
|
||||
* @return An object summarizing the write and allocation results.
|
||||
*/
|
||||
static int allocateBytesForTree(Http2Stream parent, int connectionWindowSize) throws Http2Exception {
|
||||
int allocateBytesForTree(Http2Stream parent, int connectionWindowSize) throws Http2Exception {
|
||||
AbstractState state = state(parent);
|
||||
if (state.streamableBytesForTree() <= 0) {
|
||||
return 0;
|
||||
@ -289,7 +302,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
* A {@link Http2StreamVisitor} that performs the HTTP/2 priority algorithm to distribute the available connection
|
||||
* window appropriately to the children of a given stream.
|
||||
*/
|
||||
private static final class ChildFeeder implements Http2StreamVisitor {
|
||||
private final class ChildFeeder implements Http2StreamVisitor {
|
||||
final int maxSize;
|
||||
int totalWeight;
|
||||
int connectionWindow;
|
||||
@ -399,7 +412,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
* A simplified version of {@link ChildFeeder} that is only used when all streamable bytes fit within the
|
||||
* available connection window.
|
||||
*/
|
||||
private static final class SimpleChildFeeder implements Http2StreamVisitor {
|
||||
private final class SimpleChildFeeder implements Http2StreamVisitor {
|
||||
int bytesAllocated;
|
||||
int connectionWindow;
|
||||
|
||||
@ -437,35 +450,35 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
// Set to true if cancel() was called.
|
||||
private boolean cancelled;
|
||||
|
||||
public DefaultState(Http2Stream stream, int initialWindowSize) {
|
||||
DefaultState(Http2Stream stream, int initialWindowSize) {
|
||||
super(stream);
|
||||
window(initialWindowSize);
|
||||
pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
|
||||
}
|
||||
|
||||
public DefaultState(AbstractState existingState, int initialWindowSize) {
|
||||
DefaultState(AbstractState existingState, int initialWindowSize) {
|
||||
super(existingState);
|
||||
window(initialWindowSize);
|
||||
pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int windowSize() {
|
||||
int windowSize() {
|
||||
return window;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int initialWindowSize() {
|
||||
int initialWindowSize() {
|
||||
return initialWindowSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void window(int initialWindowSize) {
|
||||
void window(int initialWindowSize) {
|
||||
window = initialWindowSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void allocate(int bytes) {
|
||||
void allocate(int bytes) {
|
||||
allocated += bytes;
|
||||
// Also artificially reduce the streamable bytes for this tree to give the appearance
|
||||
// that the data has been written. This will be restored before the allocated bytes are
|
||||
@ -474,7 +487,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeAllocatedBytes() {
|
||||
void writeAllocatedBytes() {
|
||||
int numBytes = allocated;
|
||||
|
||||
// Restore the number of streamable bytes to this branch.
|
||||
@ -493,7 +506,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
@Override
|
||||
public int incrementStreamWindow(int delta) throws Http2Exception {
|
||||
int incrementStreamWindow(int delta) throws Http2Exception {
|
||||
if (delta > 0 && Integer.MAX_VALUE - delta < window) {
|
||||
throw streamError(stream.id(), FLOW_CONTROL_ERROR,
|
||||
"Window size overflow for stream: %d", stream.id());
|
||||
@ -510,28 +523,28 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
@Override
|
||||
public int writableWindow() {
|
||||
int writableWindow() {
|
||||
return min(window, connectionWindowSize());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int streamableBytes() {
|
||||
int streamableBytes() {
|
||||
return max(0, min(pendingBytes - allocated, window));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int streamableBytesForTree() {
|
||||
int streamableBytesForTree() {
|
||||
return streamableBytesForTree;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enqueueFrame(FlowControlled frame) {
|
||||
void enqueueFrame(FlowControlled frame) {
|
||||
incrementPendingBytes(frame.size());
|
||||
pendingWriteQueue.offer(frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasFrame() {
|
||||
boolean hasFrame() {
|
||||
return !pendingWriteQueue.isEmpty();
|
||||
}
|
||||
|
||||
@ -543,7 +556,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
void cancel() {
|
||||
cancel(null);
|
||||
}
|
||||
|
||||
@ -568,7 +581,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
@Override
|
||||
public int writeBytes(int bytes) {
|
||||
int writeBytes(int bytes) {
|
||||
int bytesAttempted = 0;
|
||||
while (hasFrame()) {
|
||||
int maxBytes = min(bytes - bytesAttempted, writableWindow());
|
||||
@ -683,94 +696,94 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
* The remote flow control state for a single stream that is not in a state where flow controlled frames cannot
|
||||
* be exchanged.
|
||||
*/
|
||||
private static final class ReducedState extends AbstractState {
|
||||
public ReducedState(Http2Stream stream) {
|
||||
private final class ReducedState extends AbstractState {
|
||||
ReducedState(Http2Stream stream) {
|
||||
super(stream);
|
||||
}
|
||||
|
||||
public ReducedState(AbstractState existingState) {
|
||||
ReducedState(AbstractState existingState) {
|
||||
super(existingState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int windowSize() {
|
||||
int windowSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int initialWindowSize() {
|
||||
int initialWindowSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int writableWindow() {
|
||||
int writableWindow() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int streamableBytes() {
|
||||
int streamableBytes() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int streamableBytesForTree() {
|
||||
int streamableBytesForTree() {
|
||||
return streamableBytesForTree;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeAllocatedBytes() {
|
||||
void writeAllocatedBytes() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
void cancel() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void window(int initialWindowSize) {
|
||||
void window(int initialWindowSize) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int incrementStreamWindow(int delta) throws Http2Exception {
|
||||
int incrementStreamWindow(int delta) throws Http2Exception {
|
||||
// This operation needs to be supported during the initial settings exchange when
|
||||
// the peer has not yet acknowledged this peer being activated.
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int writeBytes(int bytes) {
|
||||
int writeBytes(int bytes) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enqueueFrame(FlowControlled frame) {
|
||||
void enqueueFrame(FlowControlled frame) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void allocate(int bytes) {
|
||||
void allocate(int bytes) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasFrame() {
|
||||
boolean hasFrame() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An abstraction around {@link FlowControlState} which provides specific extensions used by remote flow control.
|
||||
* An abstraction which provides specific extensions used by remote flow control.
|
||||
*/
|
||||
private abstract static class AbstractState implements FlowControlState {
|
||||
private abstract class AbstractState {
|
||||
protected final Http2Stream stream;
|
||||
protected int streamableBytesForTree;
|
||||
|
||||
public AbstractState(Http2Stream stream) {
|
||||
AbstractState(Http2Stream stream) {
|
||||
this.stream = stream;
|
||||
}
|
||||
|
||||
public AbstractState(AbstractState existingState) {
|
||||
AbstractState(AbstractState existingState) {
|
||||
this.stream = existingState.stream();
|
||||
this.streamableBytesForTree = existingState.streamableBytesForTree();
|
||||
}
|
||||
@ -778,7 +791,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
/**
|
||||
* The stream this state is associated with.
|
||||
*/
|
||||
public final Http2Stream stream() {
|
||||
final Http2Stream stream() {
|
||||
return stream;
|
||||
}
|
||||
|
||||
@ -786,72 +799,76 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
* Recursively increments the {@link #streamableBytesForTree()} for this branch in the priority tree starting
|
||||
* at the current node.
|
||||
*/
|
||||
public final void incrementStreamableBytesForTree(int numBytes) {
|
||||
final void incrementStreamableBytesForTree(int numBytes) {
|
||||
streamableBytesForTree += numBytes;
|
||||
if (!stream.isRoot()) {
|
||||
state(stream.parent()).incrementStreamableBytesForTree(numBytes);
|
||||
}
|
||||
}
|
||||
|
||||
abstract int windowSize();
|
||||
|
||||
abstract int initialWindowSize();
|
||||
|
||||
/**
|
||||
* Write bytes allocated bytes for this stream.
|
||||
*/
|
||||
public abstract void writeAllocatedBytes();
|
||||
abstract void writeAllocatedBytes();
|
||||
|
||||
/**
|
||||
* Returns the number of pending bytes for this node that will fit within the
|
||||
* {@link #availableWindow()}. This is used for the priority algorithm to determine the aggregate
|
||||
* {@link #writableWindow()}. This is used for the priority algorithm to determine the aggregate
|
||||
* number of bytes that can be written at each node. Each node only takes into account its
|
||||
* stream window so that when a change occurs to the connection window, these values need
|
||||
* not change (i.e. no tree traversal is required).
|
||||
*/
|
||||
public abstract int streamableBytes();
|
||||
abstract int streamableBytes();
|
||||
|
||||
/**
|
||||
* Get the {@link #streamableBytes()} for the entire tree rooted at this node.
|
||||
*/
|
||||
public abstract int streamableBytesForTree();
|
||||
abstract int streamableBytesForTree();
|
||||
|
||||
/**
|
||||
* Any operations that may be pending are cleared and the status of these operations is failed.
|
||||
*/
|
||||
public abstract void cancel();
|
||||
abstract void cancel();
|
||||
|
||||
/**
|
||||
* Reset the {@link #availableWindow()} size.
|
||||
* Reset the window size for this stream.
|
||||
*/
|
||||
public abstract void window(int initialWindowSize);
|
||||
abstract void window(int initialWindowSize);
|
||||
|
||||
/**
|
||||
* Increments the flow control window for this stream by the given delta and returns the new value.
|
||||
*/
|
||||
public abstract int incrementStreamWindow(int delta) throws Http2Exception;
|
||||
abstract int incrementStreamWindow(int delta) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Returns the maximum writable window (minimum of the stream and connection windows).
|
||||
*/
|
||||
public abstract int writableWindow();
|
||||
abstract int writableWindow();
|
||||
|
||||
/**
|
||||
* Writes up to the number of bytes from the pending queue. May write less if limited by the writable window, by
|
||||
* the number of pending writes available, or because a frame does not support splitting on arbitrary
|
||||
* boundaries.
|
||||
*/
|
||||
public abstract int writeBytes(int bytes);
|
||||
abstract int writeBytes(int bytes);
|
||||
|
||||
/**
|
||||
* Adds the {@code frame} to the pending queue and increments the pending byte count.
|
||||
*/
|
||||
public abstract void enqueueFrame(FlowControlled frame);
|
||||
abstract void enqueueFrame(FlowControlled frame);
|
||||
|
||||
/**
|
||||
* Increment the number of bytes allocated to this stream by the priority algorithm
|
||||
*/
|
||||
public abstract void allocate(int bytes);
|
||||
abstract void allocate(int bytes);
|
||||
|
||||
/**
|
||||
* Indicates whether or not there are frames in the pending queue.
|
||||
*/
|
||||
public abstract boolean hasFrame();
|
||||
abstract boolean hasFrame();
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
|
||||
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.streamError;
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
@ -38,19 +39,11 @@ import io.netty.util.ByteString;
|
||||
* stream. The decompression provided by this class will be applied to the data for the entire stream.
|
||||
*/
|
||||
public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecorator {
|
||||
private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
|
||||
@Override
|
||||
public void onStreamRemoved(Http2Stream stream) {
|
||||
final Http2Decompressor decompressor = decompressor(stream);
|
||||
if (decompressor != null) {
|
||||
cleanup(stream, decompressor);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
private final Http2Connection connection;
|
||||
private final boolean strict;
|
||||
private boolean flowControllerInitialized;
|
||||
final Http2Connection.PropertyKey propertyKey;
|
||||
|
||||
public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener) {
|
||||
this(connection, listener, true);
|
||||
@ -62,7 +55,16 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
|
||||
this.connection = connection;
|
||||
this.strict = strict;
|
||||
|
||||
connection.addListener(CLEAN_UP_LISTENER);
|
||||
propertyKey = connection.newKey();
|
||||
connection.addListener(new Http2ConnectionAdapter() {
|
||||
@Override
|
||||
public void onStreamRemoved(Http2Stream stream) {
|
||||
final Http2Decompressor decompressor = decompressor(stream);
|
||||
if (decompressor != null) {
|
||||
cleanup(stream, decompressor);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -210,7 +212,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
|
||||
final EmbeddedChannel channel = newContentDecompressor(contentEncoding);
|
||||
if (channel != null) {
|
||||
decompressor = new Http2Decompressor(channel);
|
||||
stream.setProperty(Http2Decompressor.class, decompressor);
|
||||
stream.setProperty(propertyKey, decompressor);
|
||||
// Decode the content and remove or replace the existing headers
|
||||
// so that the message looks like a decoded message.
|
||||
ByteString targetContentEncoding = getTargetContentEncoding(contentEncoding);
|
||||
@ -237,8 +239,8 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
|
||||
}
|
||||
}
|
||||
|
||||
private static Http2Decompressor decompressor(Http2Stream stream) {
|
||||
return (Http2Decompressor) (stream == null? null : stream.getProperty(Http2Decompressor.class));
|
||||
Http2Decompressor decompressor(Http2Stream stream) {
|
||||
return stream == null ? null : (Http2Decompressor) stream.getProperty(propertyKey);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -248,7 +250,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
|
||||
* @param stream The stream for which {@code decompressor} is the decompressor for
|
||||
* @param decompressor The decompressor for {@code stream}
|
||||
*/
|
||||
private static void cleanup(Http2Stream stream, Http2Decompressor decompressor) {
|
||||
private void cleanup(Http2Stream stream, Http2Decompressor decompressor) {
|
||||
final EmbeddedChannel channel = decompressor.decompressor();
|
||||
if (channel.finish()) {
|
||||
for (;;) {
|
||||
@ -259,7 +261,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
|
||||
buf.release();
|
||||
}
|
||||
}
|
||||
decompressor = stream.removeProperty(Http2Decompressor.class);
|
||||
decompressor = stream.removeProperty(propertyKey);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -286,7 +288,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
|
||||
/**
|
||||
* A decorator around the local flow controller that converts consumed bytes from uncompressed to compressed.
|
||||
*/
|
||||
private static final class ConsumedBytesConverter implements Http2LocalFlowController {
|
||||
private final class ConsumedBytesConverter implements Http2LocalFlowController {
|
||||
private final Http2LocalFlowController flowController;
|
||||
|
||||
ConsumedBytesConverter(Http2LocalFlowController flowController) {
|
||||
@ -303,6 +305,16 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
|
||||
return flowController.initialWindowSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int windowSize(Http2Stream stream) {
|
||||
return flowController.windowSize(stream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int initialWindowSize(Http2Stream stream) {
|
||||
return flowController.initialWindowSize(stream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta)
|
||||
throws Http2Exception {
|
||||
@ -330,12 +342,12 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
|
||||
flowController.consumeBytes(ctx, stream, numBytes);
|
||||
} catch (Http2Exception e) {
|
||||
if (copy != null) {
|
||||
stream.setProperty(Http2Decompressor.class, copy);
|
||||
stream.setProperty(propertyKey, copy);
|
||||
}
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
if (copy != null) {
|
||||
stream.setProperty(Http2Decompressor.class, copy);
|
||||
stream.setProperty(propertyKey, copy);
|
||||
}
|
||||
throw new Http2Exception(INTERNAL_ERROR,
|
||||
"Error while returning bytes to flow control window", t);
|
||||
|
@ -278,6 +278,17 @@ public interface Http2Connection {
|
||||
Endpoint<? extends Http2FlowController> opposite();
|
||||
}
|
||||
|
||||
/**
|
||||
* A key to be used for associating application-defined properties with streams within this connection.
|
||||
*/
|
||||
interface PropertyKey {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new key that is unique within this {@link Http2Connection}.
|
||||
*/
|
||||
PropertyKey newKey();
|
||||
|
||||
/**
|
||||
* Adds a listener of stream life-cycle events.
|
||||
*/
|
||||
|
@ -22,8 +22,8 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
public interface Http2FlowController {
|
||||
|
||||
/**
|
||||
* Sets the initial flow control window and updates all stream windows (but not the connection
|
||||
* window) by the delta.
|
||||
* Sets the connection-wide initial flow control window and updates all stream windows (but not the connection
|
||||
* stream window) by the delta.
|
||||
* <p>
|
||||
* This method is used to apply the {@code SETTINGS_INITIAL_WINDOW_SIZE} value for an
|
||||
* {@code SETTINGS} frame.
|
||||
@ -34,11 +34,24 @@ public interface Http2FlowController {
|
||||
void initialWindowSize(int newWindowSize) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Gets the initial flow control window size that is used as the basis for new stream flow
|
||||
* Gets the connection-wide initial flow control window size that is used as the basis for new stream flow
|
||||
* control windows.
|
||||
*/
|
||||
int initialWindowSize();
|
||||
|
||||
/**
|
||||
* Get the portion of the flow control window for the given stream that is currently available for sending/receiving
|
||||
* frames which are subject to flow control. This quantity is measured in number of bytes.
|
||||
*/
|
||||
int windowSize(Http2Stream stream);
|
||||
|
||||
/**
|
||||
* Get the initial flow control window size for the given stream. This quantity is measured in number of bytes. Note
|
||||
* the unavailable window portion can be calculated by {@link #initialWindowSize()} - {@link
|
||||
* #windowSize(Http2Stream)}.
|
||||
*/
|
||||
int initialWindowSize(Http2Stream stream);
|
||||
|
||||
/**
|
||||
* Increments the size of the stream's flow control window by the given delta.
|
||||
* <p>
|
||||
|
@ -33,24 +33,6 @@ public interface Http2Stream {
|
||||
CLOSED
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents the state which flow controller implementations are expected to track.
|
||||
*/
|
||||
interface FlowControlState {
|
||||
/**
|
||||
* Get the portion of the flow control window that is available for sending/receiving frames which are subject
|
||||
* to flow control. This quantity is measured in number of bytes.
|
||||
*/
|
||||
int windowSize();
|
||||
|
||||
/**
|
||||
* Get the initial flow control window size. This quantity is measured in number of bytes.
|
||||
* Note the unavailable window portion can be calculated by
|
||||
* {@link #initialWindowSize()} - {@link #windowSize()}.
|
||||
*/
|
||||
int initialWindowSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the unique identifier for this stream within the connection.
|
||||
*/
|
||||
@ -61,26 +43,6 @@ public interface Http2Stream {
|
||||
*/
|
||||
State state();
|
||||
|
||||
/**
|
||||
* Get the state as related to the {@link Http2LocalFlowController}.
|
||||
*/
|
||||
FlowControlState localFlowState();
|
||||
|
||||
/**
|
||||
* Set the state as related to the {@link Http2LocalFlowController}.
|
||||
*/
|
||||
void localFlowState(FlowControlState state);
|
||||
|
||||
/**
|
||||
* Get the state as related to {@link Http2RemoteFlowController}.
|
||||
*/
|
||||
FlowControlState remoteFlowState();
|
||||
|
||||
/**
|
||||
* Set the state as related to {@link Http2RemoteFlowController}.
|
||||
*/
|
||||
void remoteFlowState(FlowControlState state);
|
||||
|
||||
/**
|
||||
* Opens this stream, making it available via {@link Http2Connection#forEachActiveStream(Http2StreamVisitor)} and
|
||||
* transition state to:
|
||||
@ -140,17 +102,17 @@ public interface Http2Stream {
|
||||
* Associates the application-defined data with this stream.
|
||||
* @return The value that was previously associated with {@code key}, or {@code null} if there was none.
|
||||
*/
|
||||
Object setProperty(Object key, Object value);
|
||||
<V> V setProperty(Http2Connection.PropertyKey key, V value);
|
||||
|
||||
/**
|
||||
* Returns application-defined data if any was associated with this stream.
|
||||
*/
|
||||
<V> V getProperty(Object key);
|
||||
<V> V getProperty(Http2Connection.PropertyKey key);
|
||||
|
||||
/**
|
||||
* Returns and removes application-defined data if any was associated with this stream.
|
||||
*/
|
||||
<V> V removeProperty(Object key);
|
||||
<V> V removeProperty(Http2Connection.PropertyKey key);
|
||||
|
||||
/**
|
||||
* Updates an priority for this stream. Calling this method may affect the straucture of the
|
||||
|
@ -67,6 +67,7 @@ public class DefaultHttp2LocalFlowControllerTest {
|
||||
|
||||
connection = new DefaultHttp2Connection(false);
|
||||
controller = new DefaultHttp2LocalFlowController(connection, frameWriter, updateRatio);
|
||||
connection.local().flowController(controller);
|
||||
|
||||
connection.local().createStream(STREAM_ID, false);
|
||||
}
|
||||
@ -320,7 +321,7 @@ public class DefaultHttp2LocalFlowControllerTest {
|
||||
}
|
||||
|
||||
private int window(int streamId) throws Http2Exception {
|
||||
return stream(streamId).localFlowState().windowSize();
|
||||
return controller.windowSize(stream(streamId));
|
||||
}
|
||||
|
||||
private Http2Stream stream(int streamId) {
|
||||
|
@ -86,6 +86,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
connection = new DefaultHttp2Connection(false);
|
||||
controller = new DefaultHttp2RemoteFlowController(connection);
|
||||
connection.remote().flowController(controller);
|
||||
|
||||
connection.local().createStream(STREAM_A, false);
|
||||
connection.local().createStream(STREAM_B, false);
|
||||
@ -1222,7 +1223,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
}
|
||||
|
||||
private int window(int streamId) throws Http2Exception {
|
||||
return stream(streamId).remoteFlowState().windowSize();
|
||||
return controller.windowSize(stream(streamId));
|
||||
}
|
||||
|
||||
private void incrementWindowSize(int streamId, int delta) throws Http2Exception {
|
||||
|
@ -15,6 +15,7 @@
|
||||
package io.netty.microbench.http2;
|
||||
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.http2.Http2Exception;
|
||||
@ -35,6 +36,16 @@ public final class NoopHttp2LocalFlowController implements Http2LocalFlowControl
|
||||
return MAX_INITIAL_WINDOW_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int windowSize(Http2Stream stream) {
|
||||
return MAX_INITIAL_WINDOW_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int initialWindowSize(Http2Stream stream) {
|
||||
return MAX_INITIAL_WINDOW_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta)
|
||||
throws Http2Exception {
|
||||
|
@ -15,6 +15,7 @@
|
||||
package io.netty.microbench.http2;
|
||||
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.http2.Http2Exception;
|
||||
import io.netty.handler.codec.http2.Http2RemoteFlowController;
|
||||
@ -24,6 +25,7 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
|
||||
public static final NoopHttp2RemoteFlowController INSTANCE = new NoopHttp2RemoteFlowController();
|
||||
|
||||
private NoopHttp2RemoteFlowController() { }
|
||||
|
||||
@Override
|
||||
public void initialWindowSize(int newWindowSize) throws Http2Exception {
|
||||
}
|
||||
@ -33,6 +35,16 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
|
||||
return MAX_INITIAL_WINDOW_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int windowSize(Http2Stream stream) {
|
||||
return MAX_INITIAL_WINDOW_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int initialWindowSize(Http2Stream stream) {
|
||||
return MAX_INITIAL_WINDOW_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta)
|
||||
throws Http2Exception {
|
||||
|
Loading…
x
Reference in New Issue
Block a user