Optimizing user-defined stream properties.

Motivation:

Streams currently maintain a hash map of user-defined properties, which has been shown to add significant memory overhead as well as being a performance bottleneck for lookup of frequently used properties.

Modifications:

Modifying the connection/stream to use an array as the storage of user-defined properties, indexed by the class that identifies the index into the array where the property is stored.

Result:

Stream processing performance should be improved.
This commit is contained in:
nmittler 2015-04-06 12:55:20 -07:00
parent b426fb1618
commit 70a2608325
12 changed files with 316 additions and 265 deletions

View File

@ -38,16 +38,6 @@ import io.netty.util.ByteString;
* stream. The compression provided by this class will be applied to the data for the entire stream.
*/
public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
@Override
public void onStreamRemoved(Http2Stream stream) {
final EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class);
if (compressor != null) {
cleanup(stream, compressor);
}
}
};
public static final int DEFAULT_COMPRESSION_LEVEL = 6;
public static final int DEFAULT_WINDOW_BITS = 15;
public static final int DEFAULT_MEM_LEVEL = 8;
@ -55,6 +45,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
private final int compressionLevel;
private final int windowBits;
private final int memLevel;
private final Http2Connection.PropertyKey propertyKey;
public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
this(delegate, DEFAULT_COMPRESSION_LEVEL, DEFAULT_WINDOW_BITS, DEFAULT_MEM_LEVEL);
@ -76,15 +67,23 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
this.windowBits = windowBits;
this.memLevel = memLevel;
connection().addListener(CLEAN_UP_LISTENER);
propertyKey = connection().newKey();
connection().addListener(new Http2ConnectionAdapter() {
@Override
public void onStreamRemoved(Http2Stream stream) {
final EmbeddedChannel compressor = stream.getProperty(propertyKey);
if (compressor != null) {
cleanup(stream, compressor);
}
}
});
}
@Override
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
final boolean endOfStream, ChannelPromise promise) {
final Http2Stream stream = connection().stream(streamId);
final EmbeddedChannel channel = stream == null ? null :
(EmbeddedChannel) stream.getProperty(CompressorHttp2ConnectionEncoder.class);
final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
if (channel == null) {
// The compressor may be null if no compatible encoding type was found in this stream's headers
return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
@ -216,7 +215,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
return;
}
EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class);
EmbeddedChannel compressor = stream.getProperty(propertyKey);
if (compressor == null) {
if (!endOfStream) {
ByteString encoding = headers.get(CONTENT_ENCODING);
@ -226,7 +225,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
try {
compressor = newContentCompressor(encoding);
if (compressor != null) {
stream.setProperty(CompressorHttp2ConnectionEncoder.class, compressor);
stream.setProperty(propertyKey, compressor);
ByteString targetContentEncoding = getTargetContentEncoding(encoding);
if (IDENTITY.equals(targetContentEncoding)) {
headers.remove(CONTENT_ENCODING);
@ -256,7 +255,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
* @param stream The stream for which {@code compressor} is the compressor for
* @param compressor The compressor for {@code stream}
*/
private static void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
if (compressor.finish()) {
for (;;) {
final ByteBuf buf = compressor.readOutbound();
@ -267,7 +266,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
buf.release();
}
}
stream.removeProperty(CompressorHttp2ConnectionEncoder.class);
stream.removeProperty(propertyKey);
}
/**

View File

@ -37,6 +37,7 @@ import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.PrimitiveCollections;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -44,11 +45,10 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
@ -59,9 +59,11 @@ public class DefaultHttp2Connection implements Http2Connection {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2Connection.class);
// Fields accessed by inner classes
final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
final PropertyKeyRegistry propertyKeyRegistry = new PropertyKeyRegistry();
final ConnectionStream connectionStream = new ConnectionStream();
final DefaultEndpoint<Http2LocalFlowController> localEndpoint;
final DefaultEndpoint<Http2RemoteFlowController> remoteEndpoint;
/**
* The initial size of the children map is chosen to be conservative on initial memory allocations under
* the assumption that most streams will have a small number of children. This choice may be
@ -265,11 +267,28 @@ public class DefaultHttp2Connection implements Http2Connection {
}
}
@Override
public PropertyKey newKey() {
return propertyKeyRegistry.newKey();
}
/**
* Verifies that the key is valid and returns it as the internal {@link DefaultPropertyKey} type.
*
* @throws NullPointerException if the key is {@code null}.
* @throws ClassCastException if the key is not of type {@link DefaultPropertyKey}.
* @throws IllegalArgumentException if the key was not created by this connection.
*/
final DefaultPropertyKey verifyKey(PropertyKey key) {
return checkNotNull((DefaultPropertyKey) key, "key").verifyConnection(this);
}
/**
* Simple stream implementation. Streams can be compared to each other by priority.
*/
private class DefaultStream implements Http2Stream {
private final int id;
private final PropertyMap properties = new PropertyMap();
private State state;
private short weight = DEFAULT_PRIORITY_WEIGHT;
private DefaultStream parent;
@ -277,14 +296,10 @@ public class DefaultHttp2Connection implements Http2Connection {
private int totalChildWeights;
private int prioritizableForTree = 1;
private boolean resetSent;
private PropertyMap data;
private FlowControlState localFlowState;
private FlowControlState remoteFlowState;
DefaultStream(int id, State state) {
this.id = id;
this.state = state;
data = new LazyPropertyMap(this);
}
@Override
@ -297,26 +312,6 @@ public class DefaultHttp2Connection implements Http2Connection {
return state;
}
@Override
public FlowControlState localFlowState() {
return localFlowState;
}
@Override
public void localFlowState(FlowControlState state) {
localFlowState = state;
}
@Override
public FlowControlState remoteFlowState() {
return remoteFlowState;
}
@Override
public void remoteFlowState(FlowControlState state) {
remoteFlowState = state;
}
@Override
public boolean isResetSent() {
return resetSent;
@ -329,18 +324,18 @@ public class DefaultHttp2Connection implements Http2Connection {
}
@Override
public final Object setProperty(Object key, Object value) {
return data.put(key, value);
public final <V> V setProperty(PropertyKey key, V value) {
return properties.add(verifyKey(key), value);
}
@Override
public final <V> V getProperty(Object key) {
return data.get(key);
public final <V> V getProperty(PropertyKey key) {
return properties.get(verifyKey(key));
}
@Override
public final <V> V removeProperty(Object key) {
return data.remove(key);
public final <V> V removeProperty(PropertyKey key) {
return properties.remove(verifyKey(key));
}
@Override
@ -694,74 +689,44 @@ public class DefaultHttp2Connection implements Http2Connection {
}
return false;
}
}
/**
* Allows the data map to be lazily initialized for {@link DefaultStream}.
*/
private interface PropertyMap {
Object put(Object key, Object value);
<V> V get(Object key);
<V> V remove(Object key);
}
/**
* Provides actual {@link HashMap} functionality for {@link DefaultStream}'s application data.
*/
private static final class DefaultProperyMap implements PropertyMap {
private final Map<Object, Object> data;
DefaultProperyMap(int initialSize) {
data = new HashMap<Object, Object>(initialSize);
}
@Override
public Object put(Object key, Object value) {
return data.put(key, value);
}
@SuppressWarnings("unchecked")
@Override
public <V> V get(Object key) {
return (V) data.get(key);
}
@SuppressWarnings("unchecked")
@Override
public <V> V remove(Object key) {
return (V) data.remove(key);
}
}
/**
* Provides the lazy initialization for the {@link DefaultStream} data map.
*/
private static final class LazyPropertyMap implements PropertyMap {
private static final int DEFAULT_INITIAL_SIZE = 4;
private final DefaultStream stream;
private class PropertyMap {
Object[] values = EmptyArrays.EMPTY_OBJECTS;
LazyPropertyMap(DefaultStream stream) {
this.stream = stream;
<V> V add(DefaultPropertyKey key, V value) {
resizeIfNecessary(key.index);
@SuppressWarnings("unchecked")
V prevValue = (V) values[key.index];
values[key.index] = value;
return prevValue;
}
@Override
public Object put(Object key, Object value) {
stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE);
return stream.data.put(key, value);
@SuppressWarnings("unchecked")
<V> V get(DefaultPropertyKey key) {
if (key.index >= values.length) {
return null;
}
return (V) values[key.index];
}
@Override
public <V> V get(Object key) {
stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE);
return stream.data.get(key);
@SuppressWarnings("unchecked")
<V> V remove(DefaultPropertyKey key) {
V prevValue = null;
if (key.index < values.length) {
prevValue = (V) values[key.index];
values[key.index] = null;
}
return prevValue;
}
@Override
public <V> V remove(Object key) {
stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE);
return stream.data.remove(key);
void resizeIfNecessary(int index) {
if (index >= values.length) {
values = Arrays.copyOf(values, propertyKeyRegistry.size());
}
}
}
}
@ -1190,4 +1155,43 @@ public class DefaultHttp2Connection implements Http2Connection {
return pendingIterations == 0;
}
}
/**
* Implementation of {@link PropertyKey} that specifies the index position of the property.
*/
final class DefaultPropertyKey implements PropertyKey {
private final int index;
DefaultPropertyKey(int index) {
this.index = index;
}
DefaultPropertyKey verifyConnection(Http2Connection connection) {
if (connection != DefaultHttp2Connection.this) {
throw new IllegalArgumentException("Using a key that was not created by this connection");
}
return this;
}
}
/**
* A registry of all stream property keys known by this connection.
*/
private class PropertyKeyRegistry {
final List<DefaultPropertyKey> keys = new ArrayList<DefaultPropertyKey>(4);
/**
* Registers a new property key.
*/
@SuppressWarnings("unchecked")
DefaultPropertyKey newKey() {
DefaultPropertyKey key = new DefaultPropertyKey(keys.size());
keys.add(key);
return key;
}
int size() {
return keys.size();
}
}
}

View File

@ -26,11 +26,11 @@ import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.handler.codec.http2.Http2Stream.FlowControlState;
import io.netty.util.internal.PlatformDependent;
/**
@ -45,6 +45,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
private final Http2Connection connection;
private final Http2FrameWriter frameWriter;
private final Http2Connection.PropertyKey stateKey;
private ChannelHandlerContext ctx;
private volatile float windowUpdateRatio;
private volatile int initialWindowSize = DEFAULT_WINDOW_SIZE;
@ -60,8 +61,9 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
windowUpdateRatio(windowUpdateRatio);
// Add a flow state for the connection.
connection.connectionStream().localFlowState(
new DefaultState(connection.connectionStream(), initialWindowSize));
stateKey = connection.newKey();
connection.connectionStream()
.setProperty(stateKey, new DefaultState(connection.connectionStream(), initialWindowSize));
// Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() {
@ -69,14 +71,14 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
public void onStreamAdded(Http2Stream stream) {
// Unconditionally used the reduced flow control state because it requires no object allocation
// and the DefaultFlowState will be allocated in onStreamActive.
stream.localFlowState(REDUCED_FLOW_STATE);
stream.setProperty(stateKey, REDUCED_FLOW_STATE);
}
@Override
public void onStreamActive(Http2Stream stream) {
// Need to be sure the stream's initial window is adjusted for SETTINGS
// frames which may have been exchanged while it was in IDLE
stream.localFlowState(new DefaultState(stream, initialWindowSize));
stream.setProperty(stateKey, new DefaultState(stream, initialWindowSize));
}
@Override
@ -96,7 +98,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
// Unconditionally reduce the amount of memory required for flow control because there is no
// object allocation costs associated with doing so and the stream will not have any more
// local flow control state to keep track of anymore.
stream.localFlowState(REDUCED_FLOW_STATE);
stream.setProperty(stateKey, REDUCED_FLOW_STATE);
}
}
});
@ -117,6 +119,16 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
return initialWindowSize;
}
@Override
public int windowSize(Http2Stream stream) {
return state(stream).windowSize();
}
@Override
public int initialWindowSize(Http2Stream stream) {
return state(stream).initialWindowSize();
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
checkNotNull(ctx, "ctx");
@ -230,11 +242,12 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
}
private FlowState connectionState() {
return (FlowState) connection.connectionStream().localFlowState();
return connection.connectionStream().getProperty(stateKey);
}
private static FlowState state(Http2Stream stream) {
return (FlowState) checkNotNull(stream, "stream").localFlowState();
private FlowState state(Http2Stream stream) {
checkNotNull(stream, "stream");
return stream.getProperty(stateKey);
}
private static boolean isClosed(Http2Stream stream) {
@ -352,8 +365,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
}
}
@Override
public void returnProcessedBytes(int delta) throws Http2Exception {
private void returnProcessedBytes(int delta) throws Http2Exception {
if (processedWindow - delta < window) {
throw streamError(stream.id(), INTERNAL_ERROR,
"Attempting to return too many bytes for stream %d", stream.id());
@ -410,6 +422,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
* be exchanged.
*/
private static final FlowState REDUCED_FLOW_STATE = new FlowState() {
@Override
public int windowSize() {
return 0;
@ -466,11 +479,6 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
// the peer has not yet acknowledged this peer being activated.
}
@Override
public void returnProcessedBytes(int delta) throws Http2Exception {
throw new UnsupportedOperationException();
}
@Override
public void endOfStream(boolean endOfStream) {
throw new UnsupportedOperationException();
@ -478,9 +486,14 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
};
/**
* An abstraction around {@link FlowControlState} which provides specific extensions used by local flow control.
* An abstraction which provides specific extensions used by local flow control.
*/
private interface FlowState extends FlowControlState {
private interface FlowState {
int windowSize();
int initialWindowSize();
void window(int initialWindowSize);
/**
@ -516,18 +529,13 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
*/
void incrementFlowControlWindows(int delta) throws Http2Exception;
/**
* Returns the processed bytes for this stream.
*/
void returnProcessedBytes(int delta) throws Http2Exception;
void endOfStream(boolean endOfStream);
}
/**
* Provides a means to iterate over all active streams and increment the flow control windows.
*/
private static final class WindowUpdateVisitor implements Http2StreamVisitor {
private final class WindowUpdateVisitor implements Http2StreamVisitor {
private CompositeStreamException compositeException;
private final int delta;

View File

@ -18,13 +18,13 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Stream.FlowControlState;
import io.netty.handler.codec.http2.Http2Stream.State;
import java.util.ArrayDeque;
@ -35,7 +35,7 @@ import java.util.Deque;
* Basic implementation of {@link Http2RemoteFlowController}.
*/
public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
private static final Http2StreamVisitor WRITE_ALLOCATED_BYTES = new Http2StreamVisitor() {
private final Http2StreamVisitor WRITE_ALLOCATED_BYTES = new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) {
state(stream).writeAllocatedBytes();
@ -43,6 +43,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
};
private final Http2Connection connection;
private final Http2Connection.PropertyKey stateKey;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
private ChannelHandlerContext ctx;
private boolean needFlush;
@ -51,7 +52,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
this.connection = checkNotNull(connection, "connection");
// Add a flow state for the connection.
connection.connectionStream().remoteFlowState(
stateKey = connection.newKey();
connection.connectionStream().setProperty(stateKey,
new DefaultState(connection.connectionStream(), initialWindowSize));
// Register for notification of new streams.
@ -60,7 +62,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
public void onStreamAdded(Http2Stream stream) {
// If the stream state is not open then the stream is not yet eligible for flow controlled frames and
// only requires the ReducedFlowState. Otherwise the full amount of memory is required.
stream.remoteFlowState(stream.state() == IDLE ?
stream.setProperty(stateKey, stream.state() == IDLE ?
new ReducedState(stream) :
new DefaultState(stream, 0));
}
@ -69,10 +71,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
public void onStreamActive(Http2Stream stream) {
// If the object was previously created, but later activated then we have to ensure
// the full state is allocated and the proper initialWindowSize is used.
if (stream.remoteFlowState().getClass() == DefaultState.class) {
state(stream).window(initialWindowSize);
AbstractState state = state(stream);
if (state.getClass() == DefaultState.class) {
state.window(initialWindowSize);
} else {
stream.remoteFlowState(new DefaultState(state(stream), initialWindowSize));
stream.setProperty(stateKey, new DefaultState(state, initialWindowSize));
}
}
@ -87,7 +90,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// decrease the amount of memory required for this stream because no flow controlled frames can
// be exchanged on this stream
if (stream.prioritizableForTree() != 0) {
stream.remoteFlowState(new ReducedState(state));
stream.setProperty(stateKey, new ReducedState(state));
}
}
@ -161,6 +164,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return initialWindowSize;
}
@Override
public int windowSize(Http2Stream stream) {
return state(stream).windowSize();
}
@Override
public int initialWindowSize(Http2Stream stream) {
return state(stream).initialWindowSize();
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
if (stream.id() == CONNECTION_STREAM_ID) {
@ -209,12 +222,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return state(stream).streamableBytesForTree();
}
private static AbstractState state(Http2Stream stream) {
return (AbstractState) checkNotNull(stream, "stream").remoteFlowState();
private AbstractState state(Http2Stream stream) {
return (AbstractState) checkNotNull(stream, "stream").getProperty(stateKey);
}
private AbstractState connectionState() {
return (AbstractState) connection.connectionStream().remoteFlowState();
return (AbstractState) connection.connectionStream().getProperty(stateKey);
}
/**
@ -261,7 +274,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* @param connectionWindowSize The connection window this is available for use at this point in the tree.
* @return An object summarizing the write and allocation results.
*/
static int allocateBytesForTree(Http2Stream parent, int connectionWindowSize) throws Http2Exception {
int allocateBytesForTree(Http2Stream parent, int connectionWindowSize) throws Http2Exception {
AbstractState state = state(parent);
if (state.streamableBytesForTree() <= 0) {
return 0;
@ -289,7 +302,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* A {@link Http2StreamVisitor} that performs the HTTP/2 priority algorithm to distribute the available connection
* window appropriately to the children of a given stream.
*/
private static final class ChildFeeder implements Http2StreamVisitor {
private final class ChildFeeder implements Http2StreamVisitor {
final int maxSize;
int totalWeight;
int connectionWindow;
@ -399,7 +412,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* A simplified version of {@link ChildFeeder} that is only used when all streamable bytes fit within the
* available connection window.
*/
private static final class SimpleChildFeeder implements Http2StreamVisitor {
private final class SimpleChildFeeder implements Http2StreamVisitor {
int bytesAllocated;
int connectionWindow;
@ -437,35 +450,35 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// Set to true if cancel() was called.
private boolean cancelled;
public DefaultState(Http2Stream stream, int initialWindowSize) {
DefaultState(Http2Stream stream, int initialWindowSize) {
super(stream);
window(initialWindowSize);
pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
}
public DefaultState(AbstractState existingState, int initialWindowSize) {
DefaultState(AbstractState existingState, int initialWindowSize) {
super(existingState);
window(initialWindowSize);
pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
}
@Override
public int windowSize() {
int windowSize() {
return window;
}
@Override
public int initialWindowSize() {
int initialWindowSize() {
return initialWindowSize;
}
@Override
public void window(int initialWindowSize) {
void window(int initialWindowSize) {
window = initialWindowSize;
}
@Override
public void allocate(int bytes) {
void allocate(int bytes) {
allocated += bytes;
// Also artificially reduce the streamable bytes for this tree to give the appearance
// that the data has been written. This will be restored before the allocated bytes are
@ -474,7 +487,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
@Override
public void writeAllocatedBytes() {
void writeAllocatedBytes() {
int numBytes = allocated;
// Restore the number of streamable bytes to this branch.
@ -493,7 +506,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
@Override
public int incrementStreamWindow(int delta) throws Http2Exception {
int incrementStreamWindow(int delta) throws Http2Exception {
if (delta > 0 && Integer.MAX_VALUE - delta < window) {
throw streamError(stream.id(), FLOW_CONTROL_ERROR,
"Window size overflow for stream: %d", stream.id());
@ -510,28 +523,28 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
@Override
public int writableWindow() {
int writableWindow() {
return min(window, connectionWindowSize());
}
@Override
public int streamableBytes() {
int streamableBytes() {
return max(0, min(pendingBytes - allocated, window));
}
@Override
public int streamableBytesForTree() {
int streamableBytesForTree() {
return streamableBytesForTree;
}
@Override
public void enqueueFrame(FlowControlled frame) {
void enqueueFrame(FlowControlled frame) {
incrementPendingBytes(frame.size());
pendingWriteQueue.offer(frame);
}
@Override
public boolean hasFrame() {
boolean hasFrame() {
return !pendingWriteQueue.isEmpty();
}
@ -543,7 +556,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
@Override
public void cancel() {
void cancel() {
cancel(null);
}
@ -568,7 +581,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
@Override
public int writeBytes(int bytes) {
int writeBytes(int bytes) {
int bytesAttempted = 0;
while (hasFrame()) {
int maxBytes = min(bytes - bytesAttempted, writableWindow());
@ -683,94 +696,94 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* The remote flow control state for a single stream that is not in a state where flow controlled frames cannot
* be exchanged.
*/
private static final class ReducedState extends AbstractState {
public ReducedState(Http2Stream stream) {
private final class ReducedState extends AbstractState {
ReducedState(Http2Stream stream) {
super(stream);
}
public ReducedState(AbstractState existingState) {
ReducedState(AbstractState existingState) {
super(existingState);
}
@Override
public int windowSize() {
int windowSize() {
return 0;
}
@Override
public int initialWindowSize() {
int initialWindowSize() {
return 0;
}
@Override
public int writableWindow() {
int writableWindow() {
return 0;
}
@Override
public int streamableBytes() {
int streamableBytes() {
return 0;
}
@Override
public int streamableBytesForTree() {
int streamableBytesForTree() {
return streamableBytesForTree;
}
@Override
public void writeAllocatedBytes() {
void writeAllocatedBytes() {
throw new UnsupportedOperationException();
}
@Override
public void cancel() {
void cancel() {
}
@Override
public void window(int initialWindowSize) {
void window(int initialWindowSize) {
throw new UnsupportedOperationException();
}
@Override
public int incrementStreamWindow(int delta) throws Http2Exception {
int incrementStreamWindow(int delta) throws Http2Exception {
// This operation needs to be supported during the initial settings exchange when
// the peer has not yet acknowledged this peer being activated.
return 0;
}
@Override
public int writeBytes(int bytes) {
int writeBytes(int bytes) {
throw new UnsupportedOperationException();
}
@Override
public void enqueueFrame(FlowControlled frame) {
void enqueueFrame(FlowControlled frame) {
throw new UnsupportedOperationException();
}
@Override
public void allocate(int bytes) {
void allocate(int bytes) {
throw new UnsupportedOperationException();
}
@Override
public boolean hasFrame() {
boolean hasFrame() {
return false;
}
}
/**
* An abstraction around {@link FlowControlState} which provides specific extensions used by remote flow control.
* An abstraction which provides specific extensions used by remote flow control.
*/
private abstract static class AbstractState implements FlowControlState {
private abstract class AbstractState {
protected final Http2Stream stream;
protected int streamableBytesForTree;
public AbstractState(Http2Stream stream) {
AbstractState(Http2Stream stream) {
this.stream = stream;
}
public AbstractState(AbstractState existingState) {
AbstractState(AbstractState existingState) {
this.stream = existingState.stream();
this.streamableBytesForTree = existingState.streamableBytesForTree();
}
@ -778,7 +791,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
/**
* The stream this state is associated with.
*/
public final Http2Stream stream() {
final Http2Stream stream() {
return stream;
}
@ -786,72 +799,76 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* Recursively increments the {@link #streamableBytesForTree()} for this branch in the priority tree starting
* at the current node.
*/
public final void incrementStreamableBytesForTree(int numBytes) {
final void incrementStreamableBytesForTree(int numBytes) {
streamableBytesForTree += numBytes;
if (!stream.isRoot()) {
state(stream.parent()).incrementStreamableBytesForTree(numBytes);
}
}
abstract int windowSize();
abstract int initialWindowSize();
/**
* Write bytes allocated bytes for this stream.
*/
public abstract void writeAllocatedBytes();
abstract void writeAllocatedBytes();
/**
* Returns the number of pending bytes for this node that will fit within the
* {@link #availableWindow()}. This is used for the priority algorithm to determine the aggregate
* {@link #writableWindow()}. This is used for the priority algorithm to determine the aggregate
* number of bytes that can be written at each node. Each node only takes into account its
* stream window so that when a change occurs to the connection window, these values need
* not change (i.e. no tree traversal is required).
*/
public abstract int streamableBytes();
abstract int streamableBytes();
/**
* Get the {@link #streamableBytes()} for the entire tree rooted at this node.
*/
public abstract int streamableBytesForTree();
abstract int streamableBytesForTree();
/**
* Any operations that may be pending are cleared and the status of these operations is failed.
*/
public abstract void cancel();
abstract void cancel();
/**
* Reset the {@link #availableWindow()} size.
* Reset the window size for this stream.
*/
public abstract void window(int initialWindowSize);
abstract void window(int initialWindowSize);
/**
* Increments the flow control window for this stream by the given delta and returns the new value.
*/
public abstract int incrementStreamWindow(int delta) throws Http2Exception;
abstract int incrementStreamWindow(int delta) throws Http2Exception;
/**
* Returns the maximum writable window (minimum of the stream and connection windows).
*/
public abstract int writableWindow();
abstract int writableWindow();
/**
* Writes up to the number of bytes from the pending queue. May write less if limited by the writable window, by
* the number of pending writes available, or because a frame does not support splitting on arbitrary
* boundaries.
*/
public abstract int writeBytes(int bytes);
abstract int writeBytes(int bytes);
/**
* Adds the {@code frame} to the pending queue and increments the pending byte count.
*/
public abstract void enqueueFrame(FlowControlled frame);
abstract void enqueueFrame(FlowControlled frame);
/**
* Increment the number of bytes allocated to this stream by the priority algorithm
*/
public abstract void allocate(int bytes);
abstract void allocate(int bytes);
/**
* Indicates whether or not there are frames in the pending queue.
*/
public abstract boolean hasFrame();
abstract boolean hasFrame();
}
}

View File

@ -24,6 +24,7 @@ import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
@ -38,19 +39,11 @@ import io.netty.util.ByteString;
* stream. The decompression provided by this class will be applied to the data for the entire stream.
*/
public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecorator {
private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
@Override
public void onStreamRemoved(Http2Stream stream) {
final Http2Decompressor decompressor = decompressor(stream);
if (decompressor != null) {
cleanup(stream, decompressor);
}
}
};
private final Http2Connection connection;
private final boolean strict;
private boolean flowControllerInitialized;
final Http2Connection.PropertyKey propertyKey;
public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener) {
this(connection, listener, true);
@ -62,7 +55,16 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
this.connection = connection;
this.strict = strict;
connection.addListener(CLEAN_UP_LISTENER);
propertyKey = connection.newKey();
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void onStreamRemoved(Http2Stream stream) {
final Http2Decompressor decompressor = decompressor(stream);
if (decompressor != null) {
cleanup(stream, decompressor);
}
}
});
}
@Override
@ -210,7 +212,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
final EmbeddedChannel channel = newContentDecompressor(contentEncoding);
if (channel != null) {
decompressor = new Http2Decompressor(channel);
stream.setProperty(Http2Decompressor.class, decompressor);
stream.setProperty(propertyKey, decompressor);
// Decode the content and remove or replace the existing headers
// so that the message looks like a decoded message.
ByteString targetContentEncoding = getTargetContentEncoding(contentEncoding);
@ -237,8 +239,8 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
}
}
private static Http2Decompressor decompressor(Http2Stream stream) {
return (Http2Decompressor) (stream == null? null : stream.getProperty(Http2Decompressor.class));
Http2Decompressor decompressor(Http2Stream stream) {
return stream == null ? null : (Http2Decompressor) stream.getProperty(propertyKey);
}
/**
@ -248,7 +250,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
* @param stream The stream for which {@code decompressor} is the decompressor for
* @param decompressor The decompressor for {@code stream}
*/
private static void cleanup(Http2Stream stream, Http2Decompressor decompressor) {
private void cleanup(Http2Stream stream, Http2Decompressor decompressor) {
final EmbeddedChannel channel = decompressor.decompressor();
if (channel.finish()) {
for (;;) {
@ -259,7 +261,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
buf.release();
}
}
decompressor = stream.removeProperty(Http2Decompressor.class);
decompressor = stream.removeProperty(propertyKey);
}
/**
@ -286,7 +288,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
/**
* A decorator around the local flow controller that converts consumed bytes from uncompressed to compressed.
*/
private static final class ConsumedBytesConverter implements Http2LocalFlowController {
private final class ConsumedBytesConverter implements Http2LocalFlowController {
private final Http2LocalFlowController flowController;
ConsumedBytesConverter(Http2LocalFlowController flowController) {
@ -303,6 +305,16 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
return flowController.initialWindowSize();
}
@Override
public int windowSize(Http2Stream stream) {
return flowController.windowSize(stream);
}
@Override
public int initialWindowSize(Http2Stream stream) {
return flowController.initialWindowSize(stream);
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta)
throws Http2Exception {
@ -330,12 +342,12 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
flowController.consumeBytes(ctx, stream, numBytes);
} catch (Http2Exception e) {
if (copy != null) {
stream.setProperty(Http2Decompressor.class, copy);
stream.setProperty(propertyKey, copy);
}
throw e;
} catch (Throwable t) {
if (copy != null) {
stream.setProperty(Http2Decompressor.class, copy);
stream.setProperty(propertyKey, copy);
}
throw new Http2Exception(INTERNAL_ERROR,
"Error while returning bytes to flow control window", t);

View File

@ -278,6 +278,17 @@ public interface Http2Connection {
Endpoint<? extends Http2FlowController> opposite();
}
/**
* A key to be used for associating application-defined properties with streams within this connection.
*/
interface PropertyKey {
}
/**
* Creates a new key that is unique within this {@link Http2Connection}.
*/
PropertyKey newKey();
/**
* Adds a listener of stream life-cycle events.
*/

View File

@ -22,8 +22,8 @@ import io.netty.channel.ChannelHandlerContext;
public interface Http2FlowController {
/**
* Sets the initial flow control window and updates all stream windows (but not the connection
* window) by the delta.
* Sets the connection-wide initial flow control window and updates all stream windows (but not the connection
* stream window) by the delta.
* <p>
* This method is used to apply the {@code SETTINGS_INITIAL_WINDOW_SIZE} value for an
* {@code SETTINGS} frame.
@ -34,11 +34,24 @@ public interface Http2FlowController {
void initialWindowSize(int newWindowSize) throws Http2Exception;
/**
* Gets the initial flow control window size that is used as the basis for new stream flow
* Gets the connection-wide initial flow control window size that is used as the basis for new stream flow
* control windows.
*/
int initialWindowSize();
/**
* Get the portion of the flow control window for the given stream that is currently available for sending/receiving
* frames which are subject to flow control. This quantity is measured in number of bytes.
*/
int windowSize(Http2Stream stream);
/**
* Get the initial flow control window size for the given stream. This quantity is measured in number of bytes. Note
* the unavailable window portion can be calculated by {@link #initialWindowSize()} - {@link
* #windowSize(Http2Stream)}.
*/
int initialWindowSize(Http2Stream stream);
/**
* Increments the size of the stream's flow control window by the given delta.
* <p>

View File

@ -33,24 +33,6 @@ public interface Http2Stream {
CLOSED
}
/**
* Represents the state which flow controller implementations are expected to track.
*/
interface FlowControlState {
/**
* Get the portion of the flow control window that is available for sending/receiving frames which are subject
* to flow control. This quantity is measured in number of bytes.
*/
int windowSize();
/**
* Get the initial flow control window size. This quantity is measured in number of bytes.
* Note the unavailable window portion can be calculated by
* {@link #initialWindowSize()} - {@link #windowSize()}.
*/
int initialWindowSize();
}
/**
* Gets the unique identifier for this stream within the connection.
*/
@ -61,26 +43,6 @@ public interface Http2Stream {
*/
State state();
/**
* Get the state as related to the {@link Http2LocalFlowController}.
*/
FlowControlState localFlowState();
/**
* Set the state as related to the {@link Http2LocalFlowController}.
*/
void localFlowState(FlowControlState state);
/**
* Get the state as related to {@link Http2RemoteFlowController}.
*/
FlowControlState remoteFlowState();
/**
* Set the state as related to {@link Http2RemoteFlowController}.
*/
void remoteFlowState(FlowControlState state);
/**
* Opens this stream, making it available via {@link Http2Connection#forEachActiveStream(Http2StreamVisitor)} and
* transition state to:
@ -140,17 +102,17 @@ public interface Http2Stream {
* Associates the application-defined data with this stream.
* @return The value that was previously associated with {@code key}, or {@code null} if there was none.
*/
Object setProperty(Object key, Object value);
<V> V setProperty(Http2Connection.PropertyKey key, V value);
/**
* Returns application-defined data if any was associated with this stream.
*/
<V> V getProperty(Object key);
<V> V getProperty(Http2Connection.PropertyKey key);
/**
* Returns and removes application-defined data if any was associated with this stream.
*/
<V> V removeProperty(Object key);
<V> V removeProperty(Http2Connection.PropertyKey key);
/**
* Updates an priority for this stream. Calling this method may affect the straucture of the

View File

@ -67,6 +67,7 @@ public class DefaultHttp2LocalFlowControllerTest {
connection = new DefaultHttp2Connection(false);
controller = new DefaultHttp2LocalFlowController(connection, frameWriter, updateRatio);
connection.local().flowController(controller);
connection.local().createStream(STREAM_ID, false);
}
@ -320,7 +321,7 @@ public class DefaultHttp2LocalFlowControllerTest {
}
private int window(int streamId) throws Http2Exception {
return stream(streamId).localFlowState().windowSize();
return controller.windowSize(stream(streamId));
}
private Http2Stream stream(int streamId) {

View File

@ -86,6 +86,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
connection = new DefaultHttp2Connection(false);
controller = new DefaultHttp2RemoteFlowController(connection);
connection.remote().flowController(controller);
connection.local().createStream(STREAM_A, false);
connection.local().createStream(STREAM_B, false);
@ -1222,7 +1223,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
}
private int window(int streamId) throws Http2Exception {
return stream(streamId).remoteFlowState().windowSize();
return controller.windowSize(stream(streamId));
}
private void incrementWindowSize(int streamId, int delta) throws Http2Exception {

View File

@ -15,6 +15,7 @@
package io.netty.microbench.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Exception;
@ -35,6 +36,16 @@ public final class NoopHttp2LocalFlowController implements Http2LocalFlowControl
return MAX_INITIAL_WINDOW_SIZE;
}
@Override
public int windowSize(Http2Stream stream) {
return MAX_INITIAL_WINDOW_SIZE;
}
@Override
public int initialWindowSize(Http2Stream stream) {
return MAX_INITIAL_WINDOW_SIZE;
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta)
throws Http2Exception {

View File

@ -15,6 +15,7 @@
package io.netty.microbench.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2RemoteFlowController;
@ -24,6 +25,7 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
public static final NoopHttp2RemoteFlowController INSTANCE = new NoopHttp2RemoteFlowController();
private NoopHttp2RemoteFlowController() { }
@Override
public void initialWindowSize(int newWindowSize) throws Http2Exception {
}
@ -33,6 +35,16 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
return MAX_INITIAL_WINDOW_SIZE;
}
@Override
public int windowSize(Http2Stream stream) {
return MAX_INITIAL_WINDOW_SIZE;
}
@Override
public int initialWindowSize(Http2Stream stream) {
return MAX_INITIAL_WINDOW_SIZE;
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta)
throws Http2Exception {