HTTP/2 limit streams in all states

Motivation:
SETTINGS_MAX_CONCURRENT_STREAMS does not apply to idle streams and thus we do not apply any explicit limitations on how many idle streams can be created. This may allow a peer to consume an undesirable amount of resources.

Modifications:
- Each Endpoint should enforce a limit for streams in a any state. By default this limit will be the same as SETTINGS_MAX_CONCURRENT_STREAMS but can be overridden if necessary.

Result:
There is now a limit to how many IDLE streams can be created.
This commit is contained in:
Scott Mitchell 2016-08-09 13:10:02 -07:00
parent 47d55339c9
commit 765e944d4d
7 changed files with 79 additions and 29 deletions

View File

@ -518,6 +518,7 @@ public class DefaultHttp2Connection implements Http2Connection {
state = CLOSED;
--createdBy().numStreams;
activeStreams.deactivate(this, itr);
return this;
}
@ -827,8 +828,10 @@ public class DefaultHttp2Connection implements Http2Connection {
private boolean pushToAllowed = true;
private F flowController;
private int maxActiveStreams;
private int maxStreams;
// Fields accessed by inner classes
int numActiveStreams;
int numStreams;
DefaultEndpoint(boolean server) {
this.server = server;
@ -848,7 +851,7 @@ public class DefaultHttp2Connection implements Http2Connection {
// Push is disallowed by default for servers and allowed for clients.
pushToAllowed = !server;
maxActiveStreams = Integer.MAX_VALUE;
maxStreams = maxActiveStreams = Integer.MAX_VALUE;
}
@Override
@ -861,12 +864,12 @@ public class DefaultHttp2Connection implements Http2Connection {
nextReservationStreamId = streamId;
}
nextStreamIdToCreate = streamId + 2;
++numStreams;
}
@Override
public boolean isValidStreamId(int streamId) {
boolean even = (streamId & 1) == 0;
return streamId > 0 && server == even;
return streamId > 0 && server == ((streamId & 1) == 0);
}
@Override
@ -876,7 +879,7 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override
public boolean canOpenStream() {
return numActiveStreams + 1 <= maxActiveStreams;
return numActiveStreams < maxActiveStreams;
}
private DefaultStream createStream(int streamId, State state) throws Http2Exception {
@ -980,7 +983,17 @@ public class DefaultHttp2Connection implements Http2Connection {
}
@Override
public void maxActiveStreams(int maxActiveStreams) {
public int maxStreams() {
return maxStreams;
}
@Override
public void maxStreams(int maxActiveStreams, int maxStreams) throws Http2Exception {
if (maxStreams < maxActiveStreams) {
throw connectionError(PROTOCOL_ERROR, "maxStream[%d] streams must be >= maxActiveStreams[%d]",
maxStreams, maxActiveStreams);
}
this.maxStreams = maxStreams;
this.maxActiveStreams = maxActiveStreams;
}
@ -1035,8 +1048,12 @@ public class DefaultHttp2Connection implements Http2Connection {
if (nextStreamIdToCreate <= 0) {
throw connectionError(REFUSED_STREAM, "Stream IDs are exhausted for this endpoint.");
}
if ((state.localSideOpen() || state.remoteSideOpen()) && !canOpenStream()) {
throw connectionError(REFUSED_STREAM, "Maximum active streams violated for this endpoint.");
if (state.localSideOpen() || state.remoteSideOpen()) {
if (!canOpenStream()) {
throw connectionError(REFUSED_STREAM, "Maximum active streams violated for this endpoint.");
}
} else if (numStreams == maxStreams) {
throw streamError(streamId, REFUSED_STREAM, "Maximum streams violated for this endpoint.");
}
if (isClosed()) {
throw connectionError(INTERNAL_ERROR, "Attempted to create stream id %d after connection was closed",

View File

@ -386,7 +386,8 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
Long maxConcurrentStreams = settings.maxConcurrentStreams();
if (maxConcurrentStreams != null) {
int value = (int) Math.min(maxConcurrentStreams, Integer.MAX_VALUE);
connection.remote().maxActiveStreams(value);
// By default just enforce the SETTINGS_MAX_CONCURRENT_STREAMS limit for stream in all states.
connection.remote().maxStreams(value, value);
}
Long headerTableSize = settings.headerTableSize();

View File

@ -29,6 +29,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGH
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Integer.MAX_VALUE;
import static java.lang.Math.min;
/**
@ -87,12 +88,13 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
Long maxConcurrentStreams = settings.maxConcurrentStreams();
if (maxConcurrentStreams != null) {
connection.local().maxActiveStreams((int) min(maxConcurrentStreams, Integer.MAX_VALUE));
// TODO(scott): define an extension setting so we can communicate/enforce the maxStreams limit locally.
connection.local().maxStreams((int) min(maxConcurrentStreams, MAX_VALUE), MAX_VALUE);
}
Long headerTableSize = settings.headerTableSize();
if (headerTableSize != null) {
outboundHeaderTable.maxHeaderTableSize((int) min(headerTableSize, Integer.MAX_VALUE));
outboundHeaderTable.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE));
}
Integer maxHeaderListSize = settings.maxHeaderListSize();
@ -384,7 +386,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
FlowControlledData nextData;
if (FlowControlledData.class != next.getClass() ||
Integer.MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) {
MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) {
return false;
}
nextData.queue.copyTo(queue);

View File

@ -276,7 +276,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
private BooleanSupplier isWritableSupplier = new BooleanSupplier() {
@Override
public boolean get() throws Exception {
return windowSize() - pendingBytes() > 0;
return windowSize() > pendingBytes();
}
};

View File

@ -195,7 +195,6 @@ public interface Http2Connection {
* <ul>
* <li>The requested stream ID is not the next sequential ID for this endpoint.</li>
* <li>The stream already exists.</li>
* <li>{@link #isExhausted()} is {@code true}</li>
* <li>{@link #canOpenStream()} is {@code false}.</li>
* <li>The connection is marked as going away.</li>
* </ul>
@ -257,11 +256,22 @@ public interface Http2Connection {
int maxActiveStreams();
/**
* Sets the maximum number of streams (created by this endpoint) that are allowed to be active at once.
* This is the {@code SETTINGS_MAX_CONCURRENT_STREAMS} value sent from the opposite endpoint to
* restrict stream creation by this endpoint.
* The limit imposed by {@link #maxActiveStreams()} does not apply to streams in the IDLE state. Since IDLE
* streams can still consume resources this limit will include streams in all states.
* @return The maximum number of streams that can exist at any given time.
*/
void maxActiveStreams(int maxActiveStreams);
int maxStreams();
/**
* Sets the limit for {@code SETTINGS_MAX_CONCURRENT_STREAMS} and the limit for {@link #maxStreams()}.
* @param maxActiveStreams The maximum number of streams (created by this endpoint) that are allowed to be
* active at once. This is the {@code SETTINGS_MAX_CONCURRENT_STREAMS} value sent from the opposite endpoint to
* restrict stream creation by this endpoint.
* @param maxStreams The limit imposed by {@link #maxActiveStreams()} does not apply to streams in the IDLE
* state. Since IDLE streams can still consume resources this limit will include streams in all states.
* @throws Http2Exception if {@code maxStreams < maxActiveStream}.
*/
void maxStreams(int maxActiveStreams, int maxStreams) throws Http2Exception;
/**
* Gets the ID of the stream last successfully created by this endpoint.

View File

@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import static java.lang.Integer.MAX_VALUE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@ -392,22 +393,22 @@ public class DefaultHttp2ConnectionTest {
@Test(expected = Http2NoMoreStreamIdsException.class)
public void serverRemoteIncrementAndGetStreamShouldRespectOverflow() throws Http2Exception {
incrementAndGetStreamShouldRespectOverflow(server.remote(), Integer.MAX_VALUE);
incrementAndGetStreamShouldRespectOverflow(server.remote(), MAX_VALUE);
}
@Test(expected = Http2NoMoreStreamIdsException.class)
public void serverLocalIncrementAndGetStreamShouldRespectOverflow() throws Http2Exception {
incrementAndGetStreamShouldRespectOverflow(server.local(), Integer.MAX_VALUE - 1);
incrementAndGetStreamShouldRespectOverflow(server.local(), MAX_VALUE - 1);
}
@Test(expected = Http2NoMoreStreamIdsException.class)
public void clientRemoteIncrementAndGetStreamShouldRespectOverflow() throws Http2Exception {
incrementAndGetStreamShouldRespectOverflow(client.remote(), Integer.MAX_VALUE - 1);
incrementAndGetStreamShouldRespectOverflow(client.remote(), MAX_VALUE - 1);
}
@Test(expected = Http2NoMoreStreamIdsException.class)
public void clientLocalIncrementAndGetStreamShouldRespectOverflow() throws Http2Exception {
incrementAndGetStreamShouldRespectOverflow(client.local(), Integer.MAX_VALUE);
incrementAndGetStreamShouldRespectOverflow(client.local(), MAX_VALUE);
}
@Test(expected = Http2Exception.class)
@ -426,14 +427,27 @@ public class DefaultHttp2ConnectionTest {
}
@Test(expected = Http2Exception.class)
public void createShouldThrowWhenMaxAllowedStreamsExceeded() throws Http2Exception {
server.local().maxActiveStreams(0);
public void createShouldThrowWhenMaxAllowedStreamsOpenExceeded() throws Http2Exception {
server.local().maxStreams(0, 0);
server.local().createStream(2, true);
}
@Test(expected = Http2Exception.class)
public void createShouldThrowWhenMaxAllowedStreamsIdleExceeded() throws Http2Exception {
server.local().maxStreams(0, 0);
server.local().createIdleStream(2);
}
@Test(expected = Http2Exception.class)
public void createShouldThrowWhenMaxAllowedStreamsReservedExceeded() throws Http2Exception {
server.local().maxStreams(1, 1);
Http2Stream parent = server.local().createStream(2, false);
server.local().reservePushStream(4, parent);
}
@Test
public void createIdleShouldSucceedWhenMaxAllowedStreamsExceeded() throws Http2Exception {
server.local().maxActiveStreams(0);
public void createIdleShouldSucceedWhenMaxAllowedActiveStreamsExceeded() throws Http2Exception {
server.local().maxStreams(0, MAX_VALUE);
Http2Stream stream = server.local().createIdleStream(2);
// Opening should fail, however.
@ -442,6 +456,12 @@ public class DefaultHttp2ConnectionTest {
stream.open(false);
}
@Test(expected = Http2Exception.class)
public void createIdleShouldFailWhenMaxAllowedStreamsExceeded() throws Http2Exception {
server.local().maxStreams(0, 0);
server.local().createIdleStream(2);
}
@Test(expected = Http2Exception.class)
public void reserveWithPushDisallowedShouldThrow() throws Http2Exception {
Http2Stream stream = server.remote().createStream(3, true);
@ -490,13 +510,13 @@ public class DefaultHttp2ConnectionTest {
@SuppressWarnings("NumericOverflow")
@Test(expected = Http2Exception.class)
public void localStreamInvalidStreamIdShouldThrow() throws Http2Exception {
client.local().createStream(Integer.MAX_VALUE + 2, false);
client.local().createStream(MAX_VALUE + 2, false);
}
@SuppressWarnings("NumericOverflow")
@Test(expected = Http2Exception.class)
public void remoteStreamInvalidStreamIdShouldThrow() throws Http2Exception {
client.remote().createStream(Integer.MAX_VALUE + 1, false);
client.remote().createStream(MAX_VALUE + 1, false);
}
@Test

View File

@ -453,9 +453,9 @@ public class StreamBufferingEncoderTest {
}
@Test
public void closeShouldCancelAllBufferedStreams() {
public void closeShouldCancelAllBufferedStreams() throws Http2Exception {
encoder.writeSettingsAck(ctx, newPromise());
connection.local().maxActiveStreams(0);
connection.local().maxStreams(0, 0);
ChannelFuture f1 = encoderWriteHeaders(3, newPromise());
ChannelFuture f2 = encoderWriteHeaders(5, newPromise());