Grab-bag of minor changes to HTTP/2

Motivation:

Various small fixes/improvements to the interface to the HTTP/2 classes,
as well as some minor performance improvements.

Modifications:

- Added fix for IntObjectHashMap to ensure that capacity is always odd.
Even capacity can cause probing to fail.
- Cleaned the access to GOAWAY information in Http2Connection interface.
Endpoints now manage their own state for GOAWAY. Also added a goingAway
event handler.
- Added Endpoint methods for checking MAX_CONCURRENT_STREAMS or if the
number of streams for the endpoint have been exhausted. See
Endpoint.nextStreamId()/acceptingNewStreams().
- Changed DefaultHttp2Connection to use IntObjectHashMap. This should be
a slight memory improvement.
- Fixed check for MAX_CONCURRENT_STREAMS to correctly use the number of
active streams for the endpoint (not total active). See
DefaultHttp2Connection.checkNewStreamAllowed.
- Exposing a few methods to subclasses of AbstractHttp2ConnectionHandler
(e.g. exception handling).
- Cleaning up GOAWAY and RST_STREAM handling in
AbstractHttp2ConnectionHandler.

Result:

HTTP/2 code should provide more information to subclasses and will have
a reduced memory footprint.
This commit is contained in:
nmittler 2014-06-18 09:49:37 -07:00 committed by Norman Maurer
parent 760bbc7ea6
commit 217a666b1f
9 changed files with 700 additions and 232 deletions

View File

@ -25,6 +25,7 @@ import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
@ -189,7 +190,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ChannelFuture future = ctx.newSucceededFuture();
for (Http2Stream stream : connection.activeStreams().toArray(new Http2Stream[0])) {
close(stream, ctx, future);
close(stream, future);
}
super.channelInactive(ctx);
}
@ -201,7 +202,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof Http2Exception) {
processHttp2Exception(ctx, (Http2Exception) cause);
onHttp2Exception(ctx, (Http2Exception) cause);
}
super.exceptionCaught(ctx, cause);
@ -410,7 +411,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
// If the headers are the end of the stream, close it now.
if (endStream) {
closeLocalSide(stream, ctx, promise);
closeLocalSide(stream, promise);
}
}
@ -447,7 +448,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
return promise;
}
close(stream, ctx, promise);
stream.terminateSent();
close(stream, promise);
return frameWriter.writeRstStream(ctx, promise, streamId, errorCode);
}
@ -525,7 +527,113 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
frameReader.readFrame(ctx, in, internalFrameObserver);
} catch (Http2Exception e) {
processHttp2Exception(ctx, e);
onHttp2Exception(ctx, e);
}
}
/**
* Processes the given exception. Depending on the type of exception, delegates to either
* {@link #processConnectionError} or {@link #processStreamError}.
*/
protected final void onHttp2Exception(ChannelHandlerContext ctx, Http2Exception e) {
if (e instanceof Http2StreamException) {
onStreamError(ctx, (Http2StreamException) e);
} else {
onConnectionError(ctx, e);
}
}
/**
* Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint and waits until
* all streams are closed before shutting down the connection.
*/
protected void onConnectionError(ChannelHandlerContext ctx, Http2Exception cause) {
sendGoAway(ctx, ctx.newPromise(), cause);
}
/**
* Handler for a stream error. Sends a RST_STREAM frame to the remote endpoint and closes the stream.
*/
protected void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) {
// Send the RST_STREAM frame to the remote endpoint.
int streamId = cause.streamId();
frameWriter.writeRstStream(ctx, ctx.newPromise(), streamId, cause.error().code());
// Mark the stream as terminated and close it.
Http2Stream stream = connection.stream(streamId);
if (stream != null) {
stream.terminateSent();
close(stream, null);
}
}
/**
* Sends a GO_AWAY frame to the remote endpoint. Waits until all streams are closed before
* shutting down the connection.
*
* @param ctx the handler context
* @param promise the promise used to create the close listener.
* @param cause connection error that caused this GO_AWAY, or {@code null} if normal
* termination.
*/
protected final void sendGoAway(ChannelHandlerContext ctx, ChannelPromise promise,
Http2Exception cause) {
ChannelFuture future = null;
ChannelPromise closePromise = promise;
if (!connection.isGoAway()) {
int errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
ByteBuf debugData = toByteBuf(ctx, cause);
int lastKnownStream = connection.remote().lastStreamCreated();
future = frameWriter.writeGoAway(ctx, promise, lastKnownStream, errorCode, debugData);
closePromise = null;
connection.remote().goAwayReceived(lastKnownStream);
}
closeListener = getOrCreateCloseListener(ctx, closePromise);
// If there are no active streams, close immediately after the send is complete.
// Otherwise wait until all streams are inactive.
if (cause != null || connection.numActiveStreams() == 0) {
if (future == null) {
future = ctx.newSucceededFuture();
}
future.addListener(closeListener);
}
}
/**
* If not already created, creates a new listener for the given promise which, when complete,
* closes the connection and frees any resources.
*/
private ChannelFutureListener getOrCreateCloseListener(final ChannelHandlerContext ctx,
ChannelPromise promise) {
final ChannelPromise closePromise = promise == null? ctx.newPromise() : promise;
if (closeListener == null) {
// If no promise was provided, create a new one.
closeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ctx.close(closePromise);
freeResources();
}
};
} else {
closePromise.setSuccess();
}
return closeListener;
}
/**
* Frees any resources maintained by this handler.
*/
private void freeResources() {
frameReader.close();
frameWriter.close();
if (clientPrefaceString != null) {
clientPrefaceString.release();
clientPrefaceString = null;
}
}
@ -566,113 +674,52 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
}
/**
* Processes the given exception. Depending on the type of exception, delegates to either
* {@link #processConnectionError} or {@link #processStreamError}.
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a
* hook to close the channel after the given future completes.
*
* @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel. If {@code null},
* ignored.
*/
private void processHttp2Exception(ChannelHandlerContext ctx, Http2Exception e) {
if (e instanceof Http2StreamException) {
processStreamError(ctx, (Http2StreamException) e);
} else {
processConnectionError(ctx, e);
}
}
private void processConnectionError(ChannelHandlerContext ctx, Http2Exception cause) {
sendGoAway(ctx, ctx.newPromise(), cause);
}
private void processStreamError(ChannelHandlerContext ctx, Http2StreamException cause) {
// Close the stream if it was open.
int streamId = cause.streamId();
Http2Stream stream = connection.stream(streamId);
if (stream != null) {
close(stream, ctx, null);
}
// Send the Rst frame to the remote endpoint.
frameWriter.writeRstStream(ctx, ctx.newPromise(), streamId, cause.error().code());
}
private void sendGoAway(ChannelHandlerContext ctx, ChannelPromise promise,
Http2Exception cause) {
ChannelFuture future = null;
ChannelPromise closePromise = promise;
if (!connection.isGoAway()) {
connection.goAwaySent();
int errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
ByteBuf debugData = toByteBuf(ctx, cause);
future = frameWriter.writeGoAway(ctx, promise, connection.remote().lastStreamCreated(),
errorCode, debugData);
closePromise = null;
}
closeListener = getOrCreateCloseListener(ctx, closePromise);
// If there are no active streams, close immediately after the send is complete.
// Otherwise wait until all streams are inactive.
if (cause != null || connection.numActiveStreams() == 0) {
if (future == null) {
future = ctx.newSucceededFuture();
}
future.addListener(closeListener);
}
}
private ChannelFutureListener getOrCreateCloseListener(final ChannelHandlerContext ctx,
ChannelPromise promise) {
final ChannelPromise closePromise = promise == null? ctx.newPromise() : promise;
if (closeListener == null) {
// If no promise was provided, create a new one.
closeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ctx.close(closePromise);
freeResources();
}
};
} else {
closePromise.setSuccess();
}
return closeListener;
}
private void freeResources() {
frameReader.close();
frameWriter.close();
if (clientPrefaceString != null) {
clientPrefaceString.release();
clientPrefaceString = null;
}
}
private void closeLocalSide(Http2Stream stream, ChannelHandlerContext ctx, ChannelFuture future) {
private void closeLocalSide(Http2Stream stream, ChannelFuture future) {
switch (stream.state()) {
case HALF_CLOSED_LOCAL:
case OPEN:
stream.closeLocalSide();
break;
default:
close(stream, ctx, future);
close(stream, future);
break;
}
}
private void closeRemoteSide(Http2Stream stream, ChannelHandlerContext ctx, ChannelFuture future) {
/**
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a
* hook to close the channel after the given future completes.
*
* @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel. If {@code null},
* ignored.
*/
private void closeRemoteSide(Http2Stream stream, ChannelFuture future) {
switch (stream.state()) {
case HALF_CLOSED_REMOTE:
case OPEN:
stream.closeRemoteSide();
break;
default:
close(stream, ctx, future);
close(stream, future);
break;
}
}
private void close(Http2Stream stream, ChannelHandlerContext ctx, ChannelFuture future) {
/**
* Closes the given stream and adds a hook to close the channel after the given future completes.
*
* @param stream the stream to be closed.
* @param future the future after which to close the channel. If {@code null}, ignored.
*/
private void close(Http2Stream stream, ChannelFuture future) {
stream.close();
// If this connection is closing and there are no longer any
@ -815,12 +862,15 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
}
});
if (isInboundStreamAfterGoAway(streamId)) {
verifyGoAwayNotReceived();
verifyRstStreamNotReceived(stream);
if (shouldIgnoreFrame(stream)) {
// Ignore this frame.
return;
}
if (endOfStream) {
closeRemoteSide(stream, ctx, ctx.newSucceededFuture());
closeRemoteSide(stream, ctx.newSucceededFuture());
}
AbstractHttp2ConnectionHandler.this.onDataRead(ctx, streamId, data, padding, endOfStream,
@ -840,11 +890,14 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
boolean endStream, boolean endSegment) throws Http2Exception {
verifyPrefaceReceived();
if (isInboundStreamAfterGoAway(streamId)) {
Http2Stream stream = connection.stream(streamId);
verifyGoAwayNotReceived();
verifyRstStreamNotReceived(stream);
if (connection.remote().isGoAwayReceived() || (stream != null && shouldIgnoreFrame(stream))) {
// Ignore this frame.
return;
}
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
createRemoteStream(streamId, endStream, streamDependency, weight, exclusive);
} else {
@ -867,7 +920,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
// If the headers completes this stream, close it.
if (endStream) {
closeRemoteSide(stream, ctx, ctx.newSucceededFuture());
closeRemoteSide(stream, ctx.newSucceededFuture());
}
}
@ -880,13 +933,15 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
short weight, boolean exclusive) throws Http2Exception {
verifyPrefaceReceived();
if (isInboundStreamAfterGoAway(streamId)) {
Http2Stream stream = connection.requireStream(streamId);
verifyGoAwayNotReceived();
verifyRstStreamNotReceived(stream);
if (stream.state() == CLOSED || shouldIgnoreFrame(stream)) {
// Ignore frames for any stream created after we sent a go-away.
return;
}
// Set the priority for this stream on the flow controller.
connection.requireStream(streamId).setPriority(streamDependency, weight, exclusive);
stream.setPriority(streamDependency, weight, exclusive);
AbstractHttp2ConnectionHandler.this.onPriorityRead(ctx, streamId, streamDependency,
weight, exclusive);
@ -897,18 +952,15 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
throws Http2Exception {
verifyPrefaceReceived();
if (isInboundStreamAfterGoAway(streamId)) {
// Ignore frames for any stream created after we sent a go-away.
return;
}
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
Http2Stream stream = connection.requireStream(streamId);
verifyRstStreamNotReceived(stream);
if (stream.state() == CLOSED) {
// RstStream frames must be ignored for closed streams.
return;
}
close(stream, ctx, ctx.newSucceededFuture());
stream.terminateReceived();
close(stream, ctx.newSucceededFuture());
AbstractHttp2ConnectionHandler.this.onRstStreamRead(ctx, streamId, errorCode);
}
@ -963,13 +1015,15 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception {
verifyPrefaceReceived();
if (isInboundStreamAfterGoAway(streamId)) {
Http2Stream parentStream = connection.requireStream(streamId);
verifyGoAwayNotReceived();
verifyRstStreamNotReceived(parentStream);
if (shouldIgnoreFrame(parentStream)) {
// Ignore frames for any stream created after we sent a go-away.
return;
}
// Reserve the push stream based with a priority based on the current stream's priority.
Http2Stream parentStream = connection.requireStream(streamId);
connection.remote().reservePushStream(promisedStreamId, parentStream);
AbstractHttp2ConnectionHandler.this.onPushPromiseRead(ctx, streamId, promisedStreamId,
@ -980,7 +1034,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception {
// Don't allow any more connections to be created.
connection.goAwayReceived();
connection.local().goAwayReceived(lastStreamId);
AbstractHttp2ConnectionHandler.this.onGoAwayRead(ctx, lastStreamId, errorCode, debugData);
}
@ -990,20 +1044,14 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
int windowSizeIncrement) throws Http2Exception {
verifyPrefaceReceived();
if (isInboundStreamAfterGoAway(streamId)) {
Http2Stream stream = connection.requireStream(streamId);
verifyGoAwayNotReceived();
verifyRstStreamNotReceived(stream);
if (stream.state() == CLOSED || shouldIgnoreFrame(stream)) {
// Ignore frames for any stream created after we sent a go-away.
return;
}
if (streamId > 0) {
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
// Window Update frames must be ignored for closed streams.
return;
}
stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_REMOTE);
}
// Update the outbound flow controller.
outboundFlow.updateOutboundWindowSize(streamId, windowSizeIncrement);
@ -1024,20 +1072,14 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
public void onBlockedRead(ChannelHandlerContext ctx, int streamId) throws Http2Exception {
verifyPrefaceReceived();
if (isInboundStreamAfterGoAway(streamId)) {
// Ignore frames for any stream created after we sent a go-away.
Http2Stream stream = connection.requireStream(streamId);
verifyGoAwayNotReceived();
verifyRstStreamNotReceived(stream);
if (stream.state() == CLOSED || shouldIgnoreFrame(stream)) {
// Ignored for closed streams.
return;
}
if (streamId > 0) {
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
// Window Update frames must be ignored for closed streams.
return;
}
stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_REMOTE);
}
// Update the outbound flow controller.
outboundFlow.setBlocked(streamId);
@ -1045,12 +1087,39 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
}
/**
* Determines whether or not the stream was created after we sent a go-away frame. Frames
* from streams created after we sent a go-away should be ignored. Frames for the connection
* stream ID (i.e. 0) will always be allowed.
* Indicates whether or not frames for the given stream should be ignored based on the state
* of the stream/connection.
*/
private boolean isInboundStreamAfterGoAway(int streamId) {
return connection.isGoAwaySent() && connection.remote().lastStreamCreated() <= streamId;
private boolean shouldIgnoreFrame(Http2Stream stream) {
if (connection.remote().isGoAwayReceived() && connection.remote().lastStreamCreated() <= stream.id()) {
// Frames from streams created after we sent a go-away should be ignored.
// Frames for the connection stream ID (i.e. 0) will always be allowed.
return true;
}
// Also ignore inbound frames after we sent a RST_STREAM frame.
return stream.isTerminateSent();
}
/**
* Verifies that a GO_AWAY frame was not previously received from the remote endpoint. If it
* was, throws an exception.
*/
private void verifyGoAwayNotReceived() throws Http2Exception {
if (connection.local().isGoAwayReceived()) {
throw protocolError("Received frames after receiving GO_AWAY");
}
}
/**
* Verifies that a RST_STREAM frame was not previously received for the given stream. If it
* was, throws an exception.
*/
private void verifyRstStreamNotReceived(Http2Stream stream) throws Http2Exception {
if (stream != null && stream.isTerminateReceived()) {
throw new Http2StreamException(stream.id(), STREAM_CLOSED,
"Frame received after receiving RST_STREAM for stream: " + stream.id());
}
}
}
@ -1120,7 +1189,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
// original
// future that was returned to the caller.
failAllPromises(future.cause());
processHttp2Exception(ctx,
onHttp2Exception(ctx,
toHttp2Exception(future.cause()));
}
}
@ -1129,7 +1198,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
// Close the local side of the stream if this is the last frame
if (endStream) {
Http2Stream stream = connection.stream(streamId);
closeLocalSide(stream, ctx, ctx.newPromise());
closeLocalSide(stream, ctx.newPromise());
}
}

View File

@ -30,14 +30,14 @@ import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
import io.netty.util.collection.Collections;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
/**
@ -46,14 +46,12 @@ import java.util.Set;
public class DefaultHttp2Connection implements Http2Connection {
private final Set<Listener> listeners = new HashSet<Listener>(4);
private final Map<Integer, Http2Stream> streamMap = new HashMap<Integer, Http2Stream>();
private final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
private final ConnectionStream connectionStream = new ConnectionStream();
private final Set<Http2Stream> activeStreams = new LinkedHashSet<Http2Stream>();
private final DefaultEndpoint localEndpoint;
private final DefaultEndpoint remoteEndpoint;
private final Http2StreamRemovalPolicy removalPolicy;
private boolean goAwaySent;
private boolean goAwayReceived;
/**
* Creates a connection with compression disabled and an immediate stream removal policy.
@ -143,7 +141,7 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override
public Set<Http2Stream> activeStreams() {
return Collections.unmodifiableSet(activeStreams);
return java.util.Collections.unmodifiableSet(activeStreams);
}
@Override
@ -156,29 +154,9 @@ public class DefaultHttp2Connection implements Http2Connection {
return remoteEndpoint;
}
@Override
public void goAwaySent() {
goAwaySent = true;
}
@Override
public void goAwayReceived() {
goAwayReceived = true;
}
@Override
public boolean isGoAwaySent() {
return goAwaySent;
}
@Override
public boolean isGoAwayReceived() {
return goAwayReceived;
}
@Override
public boolean isGoAway() {
return isGoAwaySent() || isGoAwayReceived();
return localEndpoint.isGoAwayReceived() || remoteEndpoint.isGoAwayReceived();
}
private void addStream(DefaultStream stream) {
@ -203,20 +181,36 @@ public class DefaultHttp2Connection implements Http2Connection {
((DefaultStream) stream.parent()).removeChild(stream);
}
private void activate(Http2Stream stream) {
private void activate(DefaultStream stream) {
activeStreams.add(stream);
// Update the number of active streams initiated by the endpoint.
stream.createdBy().numActiveStreams++;
// Notify the listeners.
for (Listener listener : listeners) {
listener.streamActive(stream);
}
}
private void deactivate(Http2Stream stream) {
private void deactivate(DefaultStream stream) {
activeStreams.remove(stream);
// Update the number of active streams initiated by the endpoint.
stream.createdBy().numActiveStreams--;
// Notify the listeners.
for (Listener listener : listeners) {
listener.streamInactive(stream);
}
}
private void notifyGoingAway() {
for (Listener listener : listeners) {
listener.goingAway();
}
}
private void notifyHalfClosed(Http2Stream stream) {
for (Listener listener : listeners) {
listener.streamHalfClosed(stream);
@ -243,10 +237,13 @@ public class DefaultHttp2Connection implements Http2Connection {
private State state = IDLE;
private short weight = DEFAULT_PRIORITY_WEIGHT;
private DefaultStream parent;
private Map<Integer, DefaultStream> children = newChildMap();
private IntObjectMap<DefaultStream> children = newChildMap();
private int totalChildWeights;
private boolean terminateSent;
private boolean terminateReceived;
private FlowState inboundFlow;
private FlowState outboundFlow;
private Object data;
DefaultStream(int id) {
this.id = id;
@ -262,6 +259,42 @@ public class DefaultHttp2Connection implements Http2Connection {
return state;
}
@Override
public boolean isTerminateReceived() {
return terminateReceived;
}
@Override
public void terminateReceived() {
terminateReceived = true;
}
@Override
public boolean isTerminateSent() {
return terminateSent;
}
@Override
public void terminateSent() {
terminateSent = true;
}
@Override
public boolean isTerminated() {
return terminateSent || terminateReceived;
}
@Override
public void data(Object data) {
this.data = data;
}
@SuppressWarnings("unchecked")
@Override
public <T> T data() {
return (T) data;
}
@Override
public FlowState inboundFlow() {
return inboundFlow;
@ -326,7 +359,8 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override
public final Collection<? extends Http2Stream> children() {
return Collections.unmodifiableCollection(children.values());
DefaultStream[] childrenArray = children.values(DefaultStream.class);
return Arrays.asList(childrenArray);
}
@Override
@ -471,6 +505,10 @@ public class DefaultHttp2Connection implements Http2Connection {
return state == HALF_CLOSED_REMOTE || state == OPEN || state == RESERVED_LOCAL;
}
final DefaultEndpoint createdBy() {
return localEndpoint.createdStreamId(id)? localEndpoint : remoteEndpoint;
}
final void weight(short weight) {
if (parent != null && weight != this.weight) {
int delta = weight - this.weight;
@ -479,13 +517,13 @@ public class DefaultHttp2Connection implements Http2Connection {
this.weight = weight;
}
final Map<Integer, DefaultStream> removeAllChildren() {
final IntObjectMap<DefaultStream> removeAllChildren() {
if (children.isEmpty()) {
return Collections.emptyMap();
return Collections.emptyIntObjectMap();
}
totalChildWeights = 0;
Map<Integer, DefaultStream> prevChildren = children;
IntObjectMap<DefaultStream> prevChildren = children;
children = newChildMap();
return prevChildren;
}
@ -499,7 +537,7 @@ public class DefaultHttp2Connection implements Http2Connection {
// If it was requested that this child be the exclusive dependency of this node,
// move any previous children to the child node, becoming grand children
// of this node.
for (DefaultStream grandchild : removeAllChildren().values()) {
for (DefaultStream grandchild : removeAllChildren().values(DefaultStream.class)) {
child.addChild(grandchild, false);
}
}
@ -520,7 +558,7 @@ public class DefaultHttp2Connection implements Http2Connection {
totalChildWeights -= child.weight();
// Move up any grand children to be directly dependent on this node.
for (DefaultStream grandchild : child.children.values()) {
for (DefaultStream grandchild : child.children.values(DefaultStream.class)) {
addChild(grandchild, false);
}
}
@ -537,8 +575,8 @@ public class DefaultHttp2Connection implements Http2Connection {
}
}
private static <T> Map<Integer, DefaultStream> newChildMap() {
return new LinkedHashMap<Integer, DefaultStream>(4);
private static <T> IntObjectMap<DefaultStream> newChildMap() {
return new IntObjectHashMap<DefaultStream>(4);
}
/**
@ -589,10 +627,20 @@ public class DefaultHttp2Connection implements Http2Connection {
private final boolean server;
private int nextStreamId;
private int lastStreamCreated;
private int maxStreams;
private int lastKnownStream = -1;
private boolean pushToAllowed;
private boolean allowCompressedData;
/**
* The maximum number of active streams allowed to be created by this endpoint.
*/
private int maxStreams;
/**
* The current number of active streams created by this endpoint.
*/
private int numActiveStreams;
DefaultEndpoint(boolean server, boolean allowCompressedData) {
this.allowCompressedData = allowCompressedData;
this.server = server;
@ -615,6 +663,17 @@ public class DefaultHttp2Connection implements Http2Connection {
return nextStreamId > 1? nextStreamId : nextStreamId + 2;
}
@Override
public boolean createdStreamId(int streamId) {
boolean even = (streamId & 1) == 0;
return server == even;
}
@Override
public boolean acceptingNewStreams() {
return nextStreamId() > 0 && numActiveStreams + 1 <= maxStreams;
}
@Override
public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
checkNewStreamAllowed(streamId);
@ -681,6 +740,11 @@ public class DefaultHttp2Connection implements Http2Connection {
return pushToAllowed;
}
@Override
public int numActiveStreams() {
return numActiveStreams;
}
@Override
public int maxStreams() {
return maxStreams;
@ -706,6 +770,25 @@ public class DefaultHttp2Connection implements Http2Connection {
return lastStreamCreated;
}
@Override
public int lastKnownStream() {
return isGoAwayReceived()? lastKnownStream : lastStreamCreated;
}
@Override
public boolean isGoAwayReceived() {
return lastKnownStream >= 0;
}
@Override
public void goAwayReceived(int lastKnownStream) {
boolean alreadyNotified = isGoAway();
this.lastKnownStream = lastKnownStream;
if (!alreadyNotified) {
notifyGoingAway();
}
}
@Override
public Endpoint opposite() {
return isLocal() ? remoteEndpoint : localEndpoint;
@ -716,7 +799,7 @@ public class DefaultHttp2Connection implements Http2Connection {
throw protocolError("Cannot create a stream since the connection is going away");
}
verifyStreamId(streamId);
if (streamMap.size() + 1 > maxStreams) {
if (!acceptingNewStreams()) {
throw protocolError("Maximum streams exceeded for this endpoint.");
}
}
@ -729,8 +812,7 @@ public class DefaultHttp2Connection implements Http2Connection {
throw protocolError("Request stream %d is behind the next expected stream %d",
streamId, nextStreamId);
}
boolean even = (streamId & 1) == 0;
if (server != even) {
if (!createdStreamId(streamId)) {
throw protocolError("Request stream %d is not correct for %s connection", streamId,
server ? "server" : "client");
}

View File

@ -84,6 +84,11 @@ public interface Http2Connection {
* @param subtreeRoot the new root of the subtree that has changed.
*/
void streamPrioritySubtreeChanged(Http2Stream stream, Http2Stream subtreeRoot);
/**
* Called when a GO_AWAY frame has either been sent or received for the connection.
*/
void goingAway();
}
/**
@ -92,10 +97,24 @@ public interface Http2Connection {
interface Endpoint {
/**
* Returns the next valid streamId for this endpoint.
* Returns the next valid streamId for this endpoint. If negative, the stream IDs are
* exhausted for this endpoint an no further streams may be created.
*/
int nextStreamId();
/**
* Indicates whether the given streamId is from the set of IDs used by this endpoint to
* create new streams.
*/
boolean createdStreamId(int streamId);
/**
* Indicates whether or not this endpoint is currently accepting new streams. This will be
* be false if {@link #numActiveStreams()} + 1 >= {@link #maxStreams()} or if the stream IDs
* for this endpoint have been exhausted (i.e. {@link #nextStreamId()} < 0).
*/
boolean acceptingNewStreams();
/**
* Creates a stream initiated by this endpoint. This could fail for the following reasons:
* <p/>
@ -143,6 +162,11 @@ public interface Http2Connection {
*/
boolean allowPushTo();
/**
* Gets the number of currently active streams that were created by this endpoint.
*/
int numActiveStreams();
/**
* Gets the maximum number of concurrent streams allowed by this endpoint.
*/
@ -168,6 +192,24 @@ public interface Http2Connection {
*/
int lastStreamCreated();
/**
* Gets the last stream created by this endpoint that is "known" by the opposite endpoint.
* If a GOAWAY was received for this endpoint, this will be the last stream ID from the
* GOAWAY frame. Otherwise, this will be same as {@link #lastStreamCreated()}.
*/
int lastKnownStream();
/**
* Indicates whether or not a GOAWAY was received by this endpoint.
*/
boolean isGoAwayReceived();
/**
* Indicates that a GOAWAY was received from the opposite endpoint and sets the last known stream
* created by this endpoint.
*/
void goAwayReceived(int lastKnownStream);
/**
* Gets the {@link Endpoint} opposite this one.
*/
@ -227,30 +269,7 @@ public interface Http2Connection {
Endpoint remote();
/**
* Marks that a GoAway frame has been sent on this connection. After calling this, both
* {@link #isGoAwaySent()} and {@link #isGoAway()} will be {@code true}.
*/
void goAwaySent();
/**
* Marks that a GoAway frame has been received on this connection. After calling this, both
* {@link #isGoAwayReceived()} and {@link #isGoAway()} will be {@code true}.
*/
void goAwayReceived();
/**
* Indicates that this connection received a GoAway message.
*/
boolean isGoAwaySent();
/**
* Indicates that this connection send a GoAway message.
*/
boolean isGoAwayReceived();
/**
* Indicates whether or not this endpoint is going away. This is a short form for
* {@link #isGoAwaySent()} || {@link #isGoAwayReceived()}.
* Indicates whether or not either endpoint has received a GOAWAY.
*/
boolean isGoAway();
}

View File

@ -46,4 +46,8 @@ public class Http2ConnectionAdapter implements Http2Connection.Listener {
@Override
public void streamPrioritySubtreeChanged(Http2Stream stream, Http2Stream subtreeRoot) {
}
@Override
public void goingAway() {
}
}

View File

@ -72,6 +72,34 @@ public interface Http2Stream {
*/
Http2Stream closeRemoteSide();
/**
* Indicates whether a RST_STREAM frame has been received from the remote endpoint for this stream.
*/
boolean isTerminateReceived();
/**
* Sets the flag indicating that a RST_STREAM frame has been received from the remote endpoint
* for this stream. This does not affect the stream state.
*/
void terminateReceived();
/**
* Indicates whether a RST_STREAM frame has been sent from the local endpoint for this stream.
*/
boolean isTerminateSent();
/**
* Sets the flag indicating that a RST_STREAM frame has been sent from the local endpoint
* for this stream. This does not affect the stream state.
*/
void terminateSent();
/**
* Indicates whether or not this stream has been terminated. This is a short form for
* {@link #isTerminateSent()} || {@link #isTerminateReceived()}.
*/
boolean isTerminated();
/**
* Indicates whether the remote side of this stream is open (i.e. the state is either
* {@link State#OPEN} or {@link State#HALF_CLOSED_LOCAL}).
@ -84,6 +112,16 @@ public interface Http2Stream {
*/
boolean localSideOpen();
/**
* Associates the application-defined data with this stream.
*/
void data(Object data);
/**
* Returns application-defined data if any was associated with this stream.
*/
<T> T data();
/**
* Gets the in-bound flow control state for this stream.
*/

View File

@ -156,7 +156,7 @@ public class DefaultHttp2ConnectionTest {
@Test(expected = Http2Exception.class)
public void goAwayReceivedShouldDisallowCreation() throws Http2Exception {
server.goAwayReceived();
server.local().goAwayReceived(0);
server.remote().createStream(3, true);
}

View File

@ -21,6 +21,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Headers.EMPTY_HEADERS;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
@ -51,6 +52,7 @@ import java.util.Arrays;
import java.util.Collections;
import io.netty.channel.DefaultChannelPromise;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -216,7 +218,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
handler.close(ctx, promise);
verify(writer).writeGoAway(eq(ctx), eq(promise), eq(0), eq((long) NO_ERROR.code()),
eq(EMPTY_BUFFER));
verify(connection).goAwaySent();
verify(remote).goAwayReceived(0);
}
@Test
@ -237,15 +239,16 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void connectionErrorShouldSendGoAway() throws Exception {
Http2Exception e = new Http2Exception(PROTOCOL_ERROR);
when(remote.lastStreamCreated()).thenReturn(STREAM_ID);
handler.exceptionCaught(ctx, e);
verify(connection).goAwaySent();
verify(writer).writeGoAway(eq(ctx), eq(promise), eq(0), eq((long) PROTOCOL_ERROR.code()),
verify(remote).goAwayReceived(STREAM_ID);
verify(writer).writeGoAway(eq(ctx), eq(promise), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()),
eq(EMPTY_BUFFER));
}
@Test
public void dataReadAfterGoAwayShouldApplyFlowControl() throws Exception {
when(connection.isGoAwaySent()).thenReturn(true);
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onDataRead(ctx, STREAM_ID, dummyData(), 10, true, true, true);
verify(inboundFlow).applyInboundFlowControl(eq(STREAM_ID), eq(dummyData()), eq(10),
eq(true), eq(true), eq(true), any(Http2InboundFlowController.FrameWriter.class));
@ -284,7 +287,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void headersReadAfterGoAwayShouldBeIgnored() throws Exception {
when(connection.isGoAwaySent()).thenReturn(true);
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onHeadersRead(ctx, STREAM_ID, EMPTY_HEADERS, 0, false, false);
verify(remote, never()).createStream(eq(STREAM_ID), eq(false));
@ -331,7 +334,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void pushPromiseReadAfterGoAwayShouldBeIgnored() throws Exception {
when(connection.isGoAwaySent()).thenReturn(true);
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EMPTY_HEADERS, 0);
verify(remote, never()).reservePushStream(anyInt(), any(Http2Stream.class));
verify(observer, never()).onPushPromiseRead(eq(ctx), anyInt(), anyInt(),
@ -348,7 +351,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void priorityReadAfterGoAwayShouldBeIgnored() throws Exception {
when(connection.isGoAwaySent()).thenReturn(true);
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
verify(observer, never()).onPriorityRead(eq(ctx), anyInt(), anyInt(), anyShort(), anyBoolean());
@ -363,17 +366,16 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void windowUpdateReadAfterGoAwayShouldBeIgnored() throws Exception {
when(connection.isGoAwaySent()).thenReturn(true);
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
verify(outboundFlow, never()).updateOutboundWindowSize(anyInt(), anyInt());
verify(observer, never()).onWindowUpdateRead(eq(ctx), anyInt(), anyInt());
}
@Test
public void windowUpdateReadForUnknownStreamShouldBeIgnored() throws Exception {
@Test(expected = Http2Exception.class)
public void windowUpdateReadForUnknownStreamShouldThrow() throws Exception {
when(connection.requireStream(5)).thenThrow(protocolError(""));
decode().onWindowUpdateRead(ctx, 5, 10);
verify(outboundFlow, never()).updateOutboundWindowSize(anyInt(), anyInt());
verify(observer, never()).onWindowUpdateRead(eq(ctx), anyInt(), anyInt());
}
@Test
@ -384,18 +386,17 @@ public class DelegatingHttp2ConnectionHandlerTest {
}
@Test
public void rstStreamReadAfterGoAwayShouldBeIgnored() throws Exception {
when(connection.isGoAwaySent()).thenReturn(true);
public void rstStreamReadAfterGoAwayShouldSucceed() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code());
verify(stream, never()).close();
verify(observer, never()).onRstStreamRead(eq(ctx), anyInt(), anyLong());
verify(stream).close();
verify(observer).onRstStreamRead(eq(ctx), anyInt(), anyLong());
}
@Test
public void rstStreamReadForUnknownStreamShouldBeIgnored() throws Exception {
@Test(expected = Http2Exception.class)
public void rstStreamReadForUnknownStreamShouldThrow() throws Exception {
when(connection.requireStream(5)).thenThrow(protocolError(""));
decode().onRstStreamRead(ctx, 5, PROTOCOL_ERROR.code());
verify(stream, never()).close();
verify(observer, never()).onRstStreamRead(eq(ctx), anyInt(), anyLong());
}
@Test
@ -455,8 +456,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void goAwayShouldReadShouldUpdateConnectionState() throws Exception {
decode().onGoAwayRead(ctx, 1, 2, EMPTY_BUFFER);
verify(connection).goAwayReceived();
decode().onGoAwayRead(ctx, 1, 2L, EMPTY_BUFFER);
verify(local).goAwayReceived(1);
verify(observer).onGoAwayRead(eq(ctx), eq(1), eq(2L), eq(EMPTY_BUFFER));
}

View File

@ -0,0 +1,245 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.util.collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* Utility methods for collections.
*/
public final class Collections {
private static final IntObjectMap<Object> EMPTY_INT_OBJECT_MAP = new EmptyIntObjectMap();
private Collections() {
}
/**
* Returns an unmodifiable empty {@link IntObjectMap}.
*/
@SuppressWarnings("unchecked")
public static <V> IntObjectMap<V> emptyIntObjectMap() {
return (IntObjectMap<V>) EMPTY_INT_OBJECT_MAP;
}
/**
* Creates an unmodifiable wrapper around the given map.
*/
public static <V> IntObjectMap<V> unmodifiable(final IntObjectMap<V> map) {
return new UnmodifiableIntObjectMap<V>(map);
}
/**
* An empty map. All operations that attempt to modify the map are unsupported.
*
* @param <V> the value type for the map.
*/
private static final class EmptyIntObjectMap implements IntObjectMap<Object> {
@Override
public Object get(int key) {
return null;
}
@Override
public Object put(int key, Object value) {
throw new UnsupportedOperationException("put");
}
@Override
public void putAll(IntObjectMap<Object> sourceMap) {
throw new UnsupportedOperationException("putAll");
}
@Override
public Object remove(int key) {
throw new UnsupportedOperationException("remove");
}
@Override
public int size() {
return 0;
}
@Override
public boolean isEmpty() {
return true;
}
@Override
public void clear() {
// Do nothing.
}
@Override
public boolean containsKey(int key) {
return false;
}
@Override
public boolean containsValue(Object value) {
return false;
}
@Override
public Iterable<Entry<Object>> entries() {
return java.util.Collections.emptySet();
}
@Override
public int[] keys() {
return new int[0];
}
@Override
public Object[] values(Class<Object> clazz) {
return new Object[0];
}
}
/**
* An unmodifiable wrapper around a {@link IntObjectMap}.
*
* @param <V> the value type stored in the map.
*/
private static final class UnmodifiableIntObjectMap<V> implements IntObjectMap<V>,
Iterable<IntObjectMap.Entry<V>> {
final IntObjectMap<V> map;
UnmodifiableIntObjectMap(IntObjectMap<V> map) {
this.map = map;
}
@Override
public V get(int key) {
return map.get(key);
}
@Override
public V put(int key, V value) {
throw new UnsupportedOperationException("put");
}
@Override
public void putAll(IntObjectMap<V> sourceMap) {
throw new UnsupportedOperationException("putAll");
}
@Override
public V remove(int key) {
throw new UnsupportedOperationException("remove");
}
@Override
public int size() {
return map.size();
}
@Override
public boolean isEmpty() {
return map.isEmpty();
}
@Override
public void clear() {
throw new UnsupportedOperationException("clear");
}
@Override
public boolean containsKey(int key) {
return map.containsKey(key);
}
@Override
public boolean containsValue(V value) {
return map.containsValue(value);
}
@Override
public Iterable<Entry<V>> entries() {
return this;
}
@Override
public Iterator<Entry<V>> iterator() {
return new IteratorImpl(map.entries().iterator());
}
@Override
public int[] keys() {
return map.keys();
}
@Override
public V[] values(Class<V> clazz) {
return map.values(clazz);
}
/**
* Unmodifiable wrapper for an iterator.
*/
private class IteratorImpl implements Iterator<Entry<V>> {
final Iterator<Entry<V>> iter;
IteratorImpl(Iterator<Entry<V>> iter) {
this.iter = iter;
}
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public Entry<V> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return new EntryImpl(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException("remove");
}
}
/**
* Unmodifiable wrapper for an entry.
*/
private class EntryImpl implements Entry<V> {
final Entry<V> entry;
EntryImpl(Entry<V> entry) {
this.entry = entry;
}
@Override
public int key() {
return entry.key();
}
@Override
public V value() {
return entry.value();
}
@Override
public void setValue(V value) {
throw new UnsupportedOperationException("setValue");
}
}
};
}

View File

@ -74,6 +74,9 @@ public class IntObjectHashMap<V> implements IntObjectMap<V>, Iterable<IntObjectM
this.loadFactor = loadFactor;
// Adjust the initial capacity if necessary.
initialCapacity = adjustCapacity(initialCapacity);
// Allocate the arrays.
states = new byte[initialCapacity];
keys = new int[initialCapacity];
@ -336,8 +339,7 @@ public class IntObjectHashMap<V> implements IntObjectMap<V>, Iterable<IntObjectM
if (size > maxSize) {
// Need to grow the arrays.
// TODO: consider using the next prime greater than capacity * 2.
rehash(capacity() * 2);
rehash(adjustCapacity(capacity() * 2));
} else if (available == 0) {
// Open addressing requires that we have at least 1 slot available. Need to refresh
// the arrays to clear any removed elements.
@ -345,6 +347,14 @@ public class IntObjectHashMap<V> implements IntObjectMap<V>, Iterable<IntObjectM
}
}
/**
* Adjusts the given capacity value to ensure that it's odd. Even capacities can break probing.
* TODO: would be better to ensure it's prime as well.
*/
private int adjustCapacity(int capacity) {
return capacity |= 1;
}
/**
* Marks the entry at the given index position as {@link REMOVED} and sets the value to
* {@code null}.